1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import logging 18import re 19import bb 20from bb import msg, event 21from bb import monitordisk 22import subprocess 23import pickle 24from multiprocessing import Process 25import shlex 26import pprint 27import time 28 29bblogger = logging.getLogger("BitBake") 30logger = logging.getLogger("BitBake.RunQueue") 31hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 32 33__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 34 35def fn_from_tid(tid): 36 return tid.rsplit(":", 1)[0] 37 38def taskname_from_tid(tid): 39 return tid.rsplit(":", 1)[1] 40 41def mc_from_tid(tid): 42 if tid.startswith('mc:') and tid.count(':') >= 2: 43 return tid.split(':')[1] 44 return "" 45 46def split_tid(tid): 47 (mc, fn, taskname, _) = split_tid_mcfn(tid) 48 return (mc, fn, taskname) 49 50def split_mc(n): 51 if n.startswith("mc:") and n.count(':') >= 2: 52 _, mc, n = n.split(":", 2) 53 return (mc, n) 54 return ('', n) 55 56def split_tid_mcfn(tid): 57 if tid.startswith('mc:') and tid.count(':') >= 2: 58 elems = tid.split(':') 59 mc = elems[1] 60 fn = ":".join(elems[2:-1]) 61 taskname = elems[-1] 62 mcfn = "mc:" + mc + ":" + fn 63 else: 64 tid = tid.rsplit(":", 1) 65 mc = "" 66 fn = tid[0] 67 taskname = tid[1] 68 mcfn = fn 69 70 return (mc, fn, taskname, mcfn) 71 72def build_tid(mc, fn, taskname): 73 if mc: 74 return "mc:" + mc + ":" + fn + ":" + taskname 75 return fn + ":" + taskname 76 77# Index used to pair up potentially matching multiconfig tasks 78# We match on PN, taskname and hash being equal 79def pending_hash_index(tid, rqdata): 80 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 81 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 82 h = rqdata.runtaskentries[tid].unihash 83 return pn + ":" + "taskname" + h 84 85class RunQueueStats: 86 """ 87 Holds statistics on the tasks handled by the associated runQueue 88 """ 89 def __init__(self, total, setscene_total): 90 self.completed = 0 91 self.skipped = 0 92 self.failed = 0 93 self.active = 0 94 self.setscene_active = 0 95 self.setscene_covered = 0 96 self.setscene_notcovered = 0 97 self.setscene_total = setscene_total 98 self.total = total 99 100 def copy(self): 101 obj = self.__class__(self.total, self.setscene_total) 102 obj.__dict__.update(self.__dict__) 103 return obj 104 105 def taskFailed(self): 106 self.active = self.active - 1 107 self.failed = self.failed + 1 108 109 def taskCompleted(self): 110 self.active = self.active - 1 111 self.completed = self.completed + 1 112 113 def taskSkipped(self): 114 self.active = self.active + 1 115 self.skipped = self.skipped + 1 116 117 def taskActive(self): 118 self.active = self.active + 1 119 120 def updateCovered(self, covered, notcovered): 121 self.setscene_covered = covered 122 self.setscene_notcovered = notcovered 123 124 def updateActiveSetscene(self, active): 125 self.setscene_active = active 126 127# These values indicate the next step due to be run in the 128# runQueue state machine 129runQueuePrepare = 2 130runQueueSceneInit = 3 131runQueueRunning = 6 132runQueueFailed = 7 133runQueueCleanUp = 8 134runQueueComplete = 9 135 136class RunQueueScheduler(object): 137 """ 138 Control the order tasks are scheduled in. 139 """ 140 name = "basic" 141 142 def __init__(self, runqueue, rqdata): 143 """ 144 The default scheduler just returns the first buildable task (the 145 priority map is sorted by task number) 146 """ 147 self.rq = runqueue 148 self.rqdata = rqdata 149 self.numTasks = len(self.rqdata.runtaskentries) 150 151 self.prio_map = [self.rqdata.runtaskentries.keys()] 152 153 self.buildable = set() 154 self.skip_maxthread = {} 155 self.stamps = {} 156 for tid in self.rqdata.runtaskentries: 157 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 158 self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 159 if tid in self.rq.runq_buildable: 160 self.buildable.append(tid) 161 162 self.rev_prio_map = None 163 self.is_pressure_usable() 164 165 def is_pressure_usable(self): 166 """ 167 If monitoring pressure, return True if pressure files can be open and read. For example 168 openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) 169 is returned. 170 """ 171 if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: 172 try: 173 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 174 open("/proc/pressure/io") as io_pressure_fds, \ 175 open("/proc/pressure/memory") as memory_pressure_fds: 176 177 self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 178 self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 179 self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 180 self.prev_pressure_time = time.time() 181 self.check_pressure = True 182 except: 183 bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") 184 self.check_pressure = False 185 else: 186 self.check_pressure = False 187 188 def exceeds_max_pressure(self): 189 """ 190 Monitor the difference in total pressure at least once per second, if 191 BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. 192 """ 193 if self.check_pressure: 194 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 195 open("/proc/pressure/io") as io_pressure_fds, \ 196 open("/proc/pressure/memory") as memory_pressure_fds: 197 # extract "total" from /proc/pressure/{cpu|io} 198 curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 199 curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 200 curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 201 exceeds_cpu_pressure = self.rq.max_cpu_pressure and (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) > self.rq.max_cpu_pressure 202 exceeds_io_pressure = self.rq.max_io_pressure and (float(curr_io_pressure) - float(self.prev_io_pressure)) > self.rq.max_io_pressure 203 exceeds_memory_pressure = self.rq.max_memory_pressure and (float(curr_memory_pressure) - float(self.prev_memory_pressure)) > self.rq.max_memory_pressure 204 now = time.time() 205 if now - self.prev_pressure_time > 1.0: 206 self.prev_cpu_pressure = curr_cpu_pressure 207 self.prev_io_pressure = curr_io_pressure 208 self.prev_memory_pressure = curr_memory_pressure 209 self.prev_pressure_time = now 210 return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) 211 return False 212 213 def next_buildable_task(self): 214 """ 215 Return the id of the first task we find that is buildable 216 """ 217 # Once tasks are running we don't need to worry about them again 218 self.buildable.difference_update(self.rq.runq_running) 219 buildable = set(self.buildable) 220 buildable.difference_update(self.rq.holdoff_tasks) 221 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 222 if not buildable: 223 return None 224 225 # Bitbake requires that at least one task be active. Only check for pressure if 226 # this is the case, otherwise the pressure limitation could result in no tasks 227 # being active and no new tasks started thereby, at times, breaking the scheduler. 228 if self.rq.stats.active and self.exceeds_max_pressure(): 229 return None 230 231 # Filter out tasks that have a max number of threads that have been exceeded 232 skip_buildable = {} 233 for running in self.rq.runq_running.difference(self.rq.runq_complete): 234 rtaskname = taskname_from_tid(running) 235 if rtaskname not in self.skip_maxthread: 236 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 237 if not self.skip_maxthread[rtaskname]: 238 continue 239 if rtaskname in skip_buildable: 240 skip_buildable[rtaskname] += 1 241 else: 242 skip_buildable[rtaskname] = 1 243 244 if len(buildable) == 1: 245 tid = buildable.pop() 246 taskname = taskname_from_tid(tid) 247 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 248 return None 249 stamp = self.stamps[tid] 250 if stamp not in self.rq.build_stamps.values(): 251 return tid 252 253 if not self.rev_prio_map: 254 self.rev_prio_map = {} 255 for tid in self.rqdata.runtaskentries: 256 self.rev_prio_map[tid] = self.prio_map.index(tid) 257 258 best = None 259 bestprio = None 260 for tid in buildable: 261 taskname = taskname_from_tid(tid) 262 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 263 continue 264 prio = self.rev_prio_map[tid] 265 if bestprio is None or bestprio > prio: 266 stamp = self.stamps[tid] 267 if stamp in self.rq.build_stamps.values(): 268 continue 269 bestprio = prio 270 best = tid 271 272 return best 273 274 def next(self): 275 """ 276 Return the id of the task we should build next 277 """ 278 if self.rq.can_start_task(): 279 return self.next_buildable_task() 280 281 def newbuildable(self, task): 282 self.buildable.add(task) 283 284 def removebuildable(self, task): 285 self.buildable.remove(task) 286 287 def describe_task(self, taskid): 288 result = 'ID %s' % taskid 289 if self.rev_prio_map: 290 result = result + (' pri %d' % self.rev_prio_map[taskid]) 291 return result 292 293 def dump_prio(self, comment): 294 bb.debug(3, '%s (most important first):\n%s' % 295 (comment, 296 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 297 index, taskid in enumerate(self.prio_map)]))) 298 299class RunQueueSchedulerSpeed(RunQueueScheduler): 300 """ 301 A scheduler optimised for speed. The priority map is sorted by task weight, 302 heavier weighted tasks (tasks needed by the most other tasks) are run first. 303 """ 304 name = "speed" 305 306 def __init__(self, runqueue, rqdata): 307 """ 308 The priority map is sorted by task weight. 309 """ 310 RunQueueScheduler.__init__(self, runqueue, rqdata) 311 312 weights = {} 313 for tid in self.rqdata.runtaskentries: 314 weight = self.rqdata.runtaskentries[tid].weight 315 if not weight in weights: 316 weights[weight] = [] 317 weights[weight].append(tid) 318 319 self.prio_map = [] 320 for weight in sorted(weights): 321 for w in weights[weight]: 322 self.prio_map.append(w) 323 324 self.prio_map.reverse() 325 326class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 327 """ 328 A scheduler optimised to complete .bb files as quickly as possible. The 329 priority map is sorted by task weight, but then reordered so once a given 330 .bb file starts to build, it's completed as quickly as possible by 331 running all tasks related to the same .bb file one after the after. 332 This works well where disk space is at a premium and classes like OE's 333 rm_work are in force. 334 """ 335 name = "completion" 336 337 def __init__(self, runqueue, rqdata): 338 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 339 340 # Extract list of tasks for each recipe, with tasks sorted 341 # ascending from "must run first" (typically do_fetch) to 342 # "runs last" (do_build). The speed scheduler prioritizes 343 # tasks that must run first before the ones that run later; 344 # this is what we depend on here. 345 task_lists = {} 346 for taskid in self.prio_map: 347 fn, taskname = taskid.rsplit(':', 1) 348 task_lists.setdefault(fn, []).append(taskname) 349 350 # Now unify the different task lists. The strategy is that 351 # common tasks get skipped and new ones get inserted after the 352 # preceeding common one(s) as they are found. Because task 353 # lists should differ only by their number of tasks, but not 354 # the ordering of the common tasks, this should result in a 355 # deterministic result that is a superset of the individual 356 # task ordering. 357 all_tasks = [] 358 for recipe, new_tasks in task_lists.items(): 359 index = 0 360 old_task = all_tasks[index] if index < len(all_tasks) else None 361 for new_task in new_tasks: 362 if old_task == new_task: 363 # Common task, skip it. This is the fast-path which 364 # avoids a full search. 365 index += 1 366 old_task = all_tasks[index] if index < len(all_tasks) else None 367 else: 368 try: 369 index = all_tasks.index(new_task) 370 # Already present, just not at the current 371 # place. We re-synchronized by changing the 372 # index so that it matches again. Now 373 # move on to the next existing task. 374 index += 1 375 old_task = all_tasks[index] if index < len(all_tasks) else None 376 except ValueError: 377 # Not present. Insert before old_task, which 378 # remains the same (but gets shifted back). 379 all_tasks.insert(index, new_task) 380 index += 1 381 bb.debug(3, 'merged task list: %s' % all_tasks) 382 383 # Now reverse the order so that tasks that finish the work on one 384 # recipe are considered more imporant (= come first). The ordering 385 # is now so that do_build is most important. 386 all_tasks.reverse() 387 388 # Group tasks of the same kind before tasks of less important 389 # kinds at the head of the queue (because earlier = lower 390 # priority number = runs earlier), while preserving the 391 # ordering by recipe. If recipe foo is more important than 392 # bar, then the goal is to work on foo's do_populate_sysroot 393 # before bar's do_populate_sysroot and on the more important 394 # tasks of foo before any of the less important tasks in any 395 # other recipe (if those other recipes are more important than 396 # foo). 397 # 398 # All of this only applies when tasks are runable. Explicit 399 # dependencies still override this ordering by priority. 400 # 401 # Here's an example why this priority re-ordering helps with 402 # minimizing disk usage. Consider a recipe foo with a higher 403 # priority than bar where foo DEPENDS on bar. Then the 404 # implicit rule (from base.bbclass) is that foo's do_configure 405 # depends on bar's do_populate_sysroot. This ensures that 406 # bar's do_populate_sysroot gets done first. Normally the 407 # tasks from foo would continue to run once that is done, and 408 # bar only gets completed and cleaned up later. By ordering 409 # bar's task that depend on bar's do_populate_sysroot before foo's 410 # do_configure, that problem gets avoided. 411 task_index = 0 412 self.dump_prio('original priorities') 413 for task in all_tasks: 414 for index in range(task_index, self.numTasks): 415 taskid = self.prio_map[index] 416 taskname = taskid.rsplit(':', 1)[1] 417 if taskname == task: 418 del self.prio_map[index] 419 self.prio_map.insert(task_index, taskid) 420 task_index += 1 421 self.dump_prio('completion priorities') 422 423class RunTaskEntry(object): 424 def __init__(self): 425 self.depends = set() 426 self.revdeps = set() 427 self.hash = None 428 self.unihash = None 429 self.task = None 430 self.weight = 1 431 432class RunQueueData: 433 """ 434 BitBake Run Queue implementation 435 """ 436 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 437 self.cooker = cooker 438 self.dataCaches = dataCaches 439 self.taskData = taskData 440 self.targets = targets 441 self.rq = rq 442 self.warn_multi_bb = False 443 444 self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() 445 self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) 446 self.setscene_ignore_tasks_checked = False 447 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 448 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 449 450 self.reset() 451 452 def reset(self): 453 self.runtaskentries = {} 454 455 def runq_depends_names(self, ids): 456 import re 457 ret = [] 458 for id in ids: 459 nam = os.path.basename(id) 460 nam = re.sub("_[^,]*,", ",", nam) 461 ret.extend([nam]) 462 return ret 463 464 def get_task_hash(self, tid): 465 return self.runtaskentries[tid].hash 466 467 def get_task_unihash(self, tid): 468 return self.runtaskentries[tid].unihash 469 470 def get_user_idstring(self, tid, task_name_suffix = ""): 471 return tid + task_name_suffix 472 473 def get_short_user_idstring(self, task, task_name_suffix = ""): 474 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 475 pn = self.dataCaches[mc].pkg_fn[taskfn] 476 taskname = taskname_from_tid(task) + task_name_suffix 477 return "%s:%s" % (pn, taskname) 478 479 def circular_depchains_handler(self, tasks): 480 """ 481 Some tasks aren't buildable, likely due to circular dependency issues. 482 Identify the circular dependencies and print them in a user readable format. 483 """ 484 from copy import deepcopy 485 486 valid_chains = [] 487 explored_deps = {} 488 msgs = [] 489 490 class TooManyLoops(Exception): 491 pass 492 493 def chain_reorder(chain): 494 """ 495 Reorder a dependency chain so the lowest task id is first 496 """ 497 lowest = 0 498 new_chain = [] 499 for entry in range(len(chain)): 500 if chain[entry] < chain[lowest]: 501 lowest = entry 502 new_chain.extend(chain[lowest:]) 503 new_chain.extend(chain[:lowest]) 504 return new_chain 505 506 def chain_compare_equal(chain1, chain2): 507 """ 508 Compare two dependency chains and see if they're the same 509 """ 510 if len(chain1) != len(chain2): 511 return False 512 for index in range(len(chain1)): 513 if chain1[index] != chain2[index]: 514 return False 515 return True 516 517 def chain_array_contains(chain, chain_array): 518 """ 519 Return True if chain_array contains chain 520 """ 521 for ch in chain_array: 522 if chain_compare_equal(ch, chain): 523 return True 524 return False 525 526 def find_chains(tid, prev_chain): 527 prev_chain.append(tid) 528 total_deps = [] 529 total_deps.extend(self.runtaskentries[tid].revdeps) 530 for revdep in self.runtaskentries[tid].revdeps: 531 if revdep in prev_chain: 532 idx = prev_chain.index(revdep) 533 # To prevent duplicates, reorder the chain to start with the lowest taskid 534 # and search through an array of those we've already printed 535 chain = prev_chain[idx:] 536 new_chain = chain_reorder(chain) 537 if not chain_array_contains(new_chain, valid_chains): 538 valid_chains.append(new_chain) 539 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 540 for dep in new_chain: 541 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 542 msgs.append("\n") 543 if len(valid_chains) > 10: 544 msgs.append("Halted dependency loops search after 10 matches.\n") 545 raise TooManyLoops 546 continue 547 scan = False 548 if revdep not in explored_deps: 549 scan = True 550 elif revdep in explored_deps[revdep]: 551 scan = True 552 else: 553 for dep in prev_chain: 554 if dep in explored_deps[revdep]: 555 scan = True 556 if scan: 557 find_chains(revdep, copy.deepcopy(prev_chain)) 558 for dep in explored_deps[revdep]: 559 if dep not in total_deps: 560 total_deps.append(dep) 561 562 explored_deps[tid] = total_deps 563 564 try: 565 for task in tasks: 566 find_chains(task, []) 567 except TooManyLoops: 568 pass 569 570 return msgs 571 572 def calculate_task_weights(self, endpoints): 573 """ 574 Calculate a number representing the "weight" of each task. Heavier weighted tasks 575 have more dependencies and hence should be executed sooner for maximum speed. 576 577 This function also sanity checks the task list finding tasks that are not 578 possible to execute due to circular dependencies. 579 """ 580 581 numTasks = len(self.runtaskentries) 582 weight = {} 583 deps_left = {} 584 task_done = {} 585 586 for tid in self.runtaskentries: 587 task_done[tid] = False 588 weight[tid] = 1 589 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 590 591 for tid in endpoints: 592 weight[tid] = 10 593 task_done[tid] = True 594 595 while True: 596 next_points = [] 597 for tid in endpoints: 598 for revdep in self.runtaskentries[tid].depends: 599 weight[revdep] = weight[revdep] + weight[tid] 600 deps_left[revdep] = deps_left[revdep] - 1 601 if deps_left[revdep] == 0: 602 next_points.append(revdep) 603 task_done[revdep] = True 604 endpoints = next_points 605 if not next_points: 606 break 607 608 # Circular dependency sanity check 609 problem_tasks = [] 610 for tid in self.runtaskentries: 611 if task_done[tid] is False or deps_left[tid] != 0: 612 problem_tasks.append(tid) 613 logger.debug2("Task %s is not buildable", tid) 614 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 615 self.runtaskentries[tid].weight = weight[tid] 616 617 if problem_tasks: 618 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 619 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 620 message = message + "Identifying dependency loops (this may take a short while)...\n" 621 logger.error(message) 622 623 msgs = self.circular_depchains_handler(problem_tasks) 624 625 message = "\n" 626 for msg in msgs: 627 message = message + msg 628 bb.msg.fatal("RunQueue", message) 629 630 return weight 631 632 def prepare(self): 633 """ 634 Turn a set of taskData into a RunQueue and compute data needed 635 to optimise the execution order. 636 """ 637 638 runq_build = {} 639 recursivetasks = {} 640 recursiveitasks = {} 641 recursivetasksselfref = set() 642 643 taskData = self.taskData 644 645 found = False 646 for mc in self.taskData: 647 if taskData[mc].taskentries: 648 found = True 649 break 650 if not found: 651 # Nothing to do 652 return 0 653 654 self.init_progress_reporter.start() 655 self.init_progress_reporter.next_stage() 656 657 # Step A - Work out a list of tasks to run 658 # 659 # Taskdata gives us a list of possible providers for every build and run 660 # target ordered by priority. It also gives information on each of those 661 # providers. 662 # 663 # To create the actual list of tasks to execute we fix the list of 664 # providers and then resolve the dependencies into task IDs. This 665 # process is repeated for each type of dependency (tdepends, deptask, 666 # rdeptast, recrdeptask, idepends). 667 668 def add_build_dependencies(depids, tasknames, depends, mc): 669 for depname in depids: 670 # Won't be in build_targets if ASSUME_PROVIDED 671 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 672 continue 673 depdata = taskData[mc].build_targets[depname][0] 674 if depdata is None: 675 continue 676 for taskname in tasknames: 677 t = depdata + ":" + taskname 678 if t in taskData[mc].taskentries: 679 depends.add(t) 680 681 def add_runtime_dependencies(depids, tasknames, depends, mc): 682 for depname in depids: 683 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 684 continue 685 depdata = taskData[mc].run_targets[depname][0] 686 if depdata is None: 687 continue 688 for taskname in tasknames: 689 t = depdata + ":" + taskname 690 if t in taskData[mc].taskentries: 691 depends.add(t) 692 693 def add_mc_dependencies(mc, tid): 694 mcdeps = taskData[mc].get_mcdepends() 695 for dep in mcdeps: 696 mcdependency = dep.split(':') 697 pn = mcdependency[3] 698 frommc = mcdependency[1] 699 mcdep = mcdependency[2] 700 deptask = mcdependency[4] 701 if mc == frommc: 702 fn = taskData[mcdep].build_targets[pn][0] 703 newdep = '%s:%s' % (fn,deptask) 704 taskData[mc].taskentries[tid].tdepends.append(newdep) 705 706 for mc in taskData: 707 for tid in taskData[mc].taskentries: 708 709 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 710 #runtid = build_tid(mc, fn, taskname) 711 712 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 713 714 depends = set() 715 task_deps = self.dataCaches[mc].task_deps[taskfn] 716 717 self.runtaskentries[tid] = RunTaskEntry() 718 719 if fn in taskData[mc].failed_fns: 720 continue 721 722 # We add multiconfig dependencies before processing internal task deps (tdepends) 723 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 724 add_mc_dependencies(mc, tid) 725 726 # Resolve task internal dependencies 727 # 728 # e.g. addtask before X after Y 729 for t in taskData[mc].taskentries[tid].tdepends: 730 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 731 depends.add(build_tid(depmc, depfn, deptaskname)) 732 733 # Resolve 'deptask' dependencies 734 # 735 # e.g. do_sometask[deptask] = "do_someothertask" 736 # (makes sure sometask runs after someothertask of all DEPENDS) 737 if 'deptask' in task_deps and taskname in task_deps['deptask']: 738 tasknames = task_deps['deptask'][taskname].split() 739 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 740 741 # Resolve 'rdeptask' dependencies 742 # 743 # e.g. do_sometask[rdeptask] = "do_someothertask" 744 # (makes sure sometask runs after someothertask of all RDEPENDS) 745 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 746 tasknames = task_deps['rdeptask'][taskname].split() 747 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 748 749 # Resolve inter-task dependencies 750 # 751 # e.g. do_sometask[depends] = "targetname:do_someothertask" 752 # (makes sure sometask runs after targetname's someothertask) 753 idepends = taskData[mc].taskentries[tid].idepends 754 for (depname, idependtask) in idepends: 755 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 756 # Won't be in build_targets if ASSUME_PROVIDED 757 depdata = taskData[mc].build_targets[depname][0] 758 if depdata is not None: 759 t = depdata + ":" + idependtask 760 depends.add(t) 761 if t not in taskData[mc].taskentries: 762 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 763 irdepends = taskData[mc].taskentries[tid].irdepends 764 for (depname, idependtask) in irdepends: 765 if depname in taskData[mc].run_targets: 766 # Won't be in run_targets if ASSUME_PROVIDED 767 if not taskData[mc].run_targets[depname]: 768 continue 769 depdata = taskData[mc].run_targets[depname][0] 770 if depdata is not None: 771 t = depdata + ":" + idependtask 772 depends.add(t) 773 if t not in taskData[mc].taskentries: 774 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 775 776 # Resolve recursive 'recrdeptask' dependencies (Part A) 777 # 778 # e.g. do_sometask[recrdeptask] = "do_someothertask" 779 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 780 # We cover the recursive part of the dependencies below 781 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 782 tasknames = task_deps['recrdeptask'][taskname].split() 783 recursivetasks[tid] = tasknames 784 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 785 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 786 if taskname in tasknames: 787 recursivetasksselfref.add(tid) 788 789 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 790 recursiveitasks[tid] = [] 791 for t in task_deps['recideptask'][taskname].split(): 792 newdep = build_tid(mc, fn, t) 793 recursiveitasks[tid].append(newdep) 794 795 self.runtaskentries[tid].depends = depends 796 # Remove all self references 797 self.runtaskentries[tid].depends.discard(tid) 798 799 #self.dump_data() 800 801 self.init_progress_reporter.next_stage() 802 803 # Resolve recursive 'recrdeptask' dependencies (Part B) 804 # 805 # e.g. do_sometask[recrdeptask] = "do_someothertask" 806 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 807 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 808 809 # Generating/interating recursive lists of dependencies is painful and potentially slow 810 # Precompute recursive task dependencies here by: 811 # a) create a temp list of reverse dependencies (revdeps) 812 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 813 # c) combine the total list of dependencies in cumulativedeps 814 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 815 816 817 revdeps = {} 818 deps = {} 819 cumulativedeps = {} 820 for tid in self.runtaskentries: 821 deps[tid] = set(self.runtaskentries[tid].depends) 822 revdeps[tid] = set() 823 cumulativedeps[tid] = set() 824 # Generate a temp list of reverse dependencies 825 for tid in self.runtaskentries: 826 for dep in self.runtaskentries[tid].depends: 827 revdeps[dep].add(tid) 828 # Find the dependency chain endpoints 829 endpoints = set() 830 for tid in self.runtaskentries: 831 if not deps[tid]: 832 endpoints.add(tid) 833 # Iterate the chains collating dependencies 834 while endpoints: 835 next = set() 836 for tid in endpoints: 837 for dep in revdeps[tid]: 838 cumulativedeps[dep].add(fn_from_tid(tid)) 839 cumulativedeps[dep].update(cumulativedeps[tid]) 840 if tid in deps[dep]: 841 deps[dep].remove(tid) 842 if not deps[dep]: 843 next.add(dep) 844 endpoints = next 845 #for tid in deps: 846 # if deps[tid]: 847 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 848 849 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 850 # resolve these recursively until we aren't adding any further extra dependencies 851 extradeps = True 852 while extradeps: 853 extradeps = 0 854 for tid in recursivetasks: 855 tasknames = recursivetasks[tid] 856 857 totaldeps = set(self.runtaskentries[tid].depends) 858 if tid in recursiveitasks: 859 totaldeps.update(recursiveitasks[tid]) 860 for dep in recursiveitasks[tid]: 861 if dep not in self.runtaskentries: 862 continue 863 totaldeps.update(self.runtaskentries[dep].depends) 864 865 deps = set() 866 for dep in totaldeps: 867 if dep in cumulativedeps: 868 deps.update(cumulativedeps[dep]) 869 870 for t in deps: 871 for taskname in tasknames: 872 newtid = t + ":" + taskname 873 if newtid == tid: 874 continue 875 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 876 extradeps += 1 877 self.runtaskentries[tid].depends.add(newtid) 878 879 # Handle recursive tasks which depend upon other recursive tasks 880 deps = set() 881 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 882 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 883 for newtid in deps: 884 for taskname in tasknames: 885 if not newtid.endswith(":" + taskname): 886 continue 887 if newtid in self.runtaskentries: 888 extradeps += 1 889 self.runtaskentries[tid].depends.add(newtid) 890 891 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 892 893 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 894 for tid in recursivetasksselfref: 895 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 896 897 self.init_progress_reporter.next_stage() 898 899 #self.dump_data() 900 901 # Step B - Mark all active tasks 902 # 903 # Start with the tasks we were asked to run and mark all dependencies 904 # as active too. If the task is to be 'forced', clear its stamp. Once 905 # all active tasks are marked, prune the ones we don't need. 906 907 logger.verbose("Marking Active Tasks") 908 909 def mark_active(tid, depth): 910 """ 911 Mark an item as active along with its depends 912 (calls itself recursively) 913 """ 914 915 if tid in runq_build: 916 return 917 918 runq_build[tid] = 1 919 920 depends = self.runtaskentries[tid].depends 921 for depend in depends: 922 mark_active(depend, depth+1) 923 924 def invalidate_task(tid, error_nostamp): 925 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 926 taskdep = self.dataCaches[mc].task_deps[taskfn] 927 if fn + ":" + taskname not in taskData[mc].taskentries: 928 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 929 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 930 if error_nostamp: 931 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 932 else: 933 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 934 else: 935 logger.verbose("Invalidate task %s, %s", taskname, fn) 936 bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn) 937 938 self.target_tids = [] 939 for (mc, target, task, fn) in self.targets: 940 941 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 942 continue 943 944 if target in taskData[mc].failed_deps: 945 continue 946 947 parents = False 948 if task.endswith('-'): 949 parents = True 950 task = task[:-1] 951 952 if fn in taskData[mc].failed_fns: 953 continue 954 955 # fn already has mc prefix 956 tid = fn + ":" + task 957 self.target_tids.append(tid) 958 if tid not in taskData[mc].taskentries: 959 import difflib 960 tasks = [] 961 for x in taskData[mc].taskentries: 962 if x.startswith(fn + ":"): 963 tasks.append(taskname_from_tid(x)) 964 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 965 if close_matches: 966 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 967 else: 968 extra = "" 969 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 970 971 # For tasks called "XXXX-", ony run their dependencies 972 if parents: 973 for i in self.runtaskentries[tid].depends: 974 mark_active(i, 1) 975 else: 976 mark_active(tid, 1) 977 978 self.init_progress_reporter.next_stage() 979 980 # Step C - Prune all inactive tasks 981 # 982 # Once all active tasks are marked, prune the ones we don't need. 983 984 # Handle --runall 985 if self.cooker.configuration.runall: 986 # re-run the mark_active and then drop unused tasks from new list 987 reduced_tasklist = set(self.runtaskentries.keys()) 988 for tid in list(self.runtaskentries.keys()): 989 if tid not in runq_build: 990 reduced_tasklist.remove(tid) 991 runq_build = {} 992 993 for task in self.cooker.configuration.runall: 994 if not task.startswith("do_"): 995 task = "do_{0}".format(task) 996 runall_tids = set() 997 for tid in reduced_tasklist: 998 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 999 if wanttid in self.runtaskentries: 1000 runall_tids.add(wanttid) 1001 1002 for tid in list(runall_tids): 1003 mark_active(tid, 1) 1004 if self.cooker.configuration.force: 1005 invalidate_task(tid, False) 1006 1007 delcount = set() 1008 for tid in list(self.runtaskentries.keys()): 1009 if tid not in runq_build: 1010 delcount.add(tid) 1011 del self.runtaskentries[tid] 1012 1013 if self.cooker.configuration.runall: 1014 if not self.runtaskentries: 1015 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 1016 1017 self.init_progress_reporter.next_stage() 1018 1019 # Handle runonly 1020 if self.cooker.configuration.runonly: 1021 # re-run the mark_active and then drop unused tasks from new list 1022 runq_build = {} 1023 1024 for task in self.cooker.configuration.runonly: 1025 if not task.startswith("do_"): 1026 task = "do_{0}".format(task) 1027 runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] 1028 1029 for tid in runonly_tids: 1030 mark_active(tid, 1) 1031 if self.cooker.configuration.force: 1032 invalidate_task(tid, False) 1033 1034 for tid in list(self.runtaskentries.keys()): 1035 if tid not in runq_build: 1036 delcount.add(tid) 1037 del self.runtaskentries[tid] 1038 1039 if not self.runtaskentries: 1040 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 1041 1042 # 1043 # Step D - Sanity checks and computation 1044 # 1045 1046 # Check to make sure we still have tasks to run 1047 if not self.runtaskentries: 1048 if not taskData[''].halt: 1049 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 1050 else: 1051 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 1052 1053 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 1054 1055 logger.verbose("Assign Weightings") 1056 1057 self.init_progress_reporter.next_stage() 1058 1059 # Generate a list of reverse dependencies to ease future calculations 1060 for tid in self.runtaskentries: 1061 for dep in self.runtaskentries[tid].depends: 1062 self.runtaskentries[dep].revdeps.add(tid) 1063 1064 self.init_progress_reporter.next_stage() 1065 1066 # Identify tasks at the end of dependency chains 1067 # Error on circular dependency loops (length two) 1068 endpoints = [] 1069 for tid in self.runtaskentries: 1070 revdeps = self.runtaskentries[tid].revdeps 1071 if not revdeps: 1072 endpoints.append(tid) 1073 for dep in revdeps: 1074 if dep in self.runtaskentries[tid].depends: 1075 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1076 1077 1078 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1079 1080 self.init_progress_reporter.next_stage() 1081 1082 # Calculate task weights 1083 # Check of higher length circular dependencies 1084 self.runq_weight = self.calculate_task_weights(endpoints) 1085 1086 self.init_progress_reporter.next_stage() 1087 1088 # Sanity Check - Check for multiple tasks building the same provider 1089 for mc in self.dataCaches: 1090 prov_list = {} 1091 seen_fn = [] 1092 for tid in self.runtaskentries: 1093 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1094 if taskfn in seen_fn: 1095 continue 1096 if mc != tidmc: 1097 continue 1098 seen_fn.append(taskfn) 1099 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1100 if prov not in prov_list: 1101 prov_list[prov] = [taskfn] 1102 elif taskfn not in prov_list[prov]: 1103 prov_list[prov].append(taskfn) 1104 for prov in prov_list: 1105 if len(prov_list[prov]) < 2: 1106 continue 1107 if prov in self.multi_provider_allowed: 1108 continue 1109 seen_pn = [] 1110 # If two versions of the same PN are being built its fatal, we don't support it. 1111 for fn in prov_list[prov]: 1112 pn = self.dataCaches[mc].pkg_fn[fn] 1113 if pn not in seen_pn: 1114 seen_pn.append(pn) 1115 else: 1116 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1117 msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] 1118 # 1119 # Construct a list of things which uniquely depend on each provider 1120 # since this may help the user figure out which dependency is triggering this warning 1121 # 1122 msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") 1123 deplist = {} 1124 commondeps = None 1125 for provfn in prov_list[prov]: 1126 deps = set() 1127 for tid in self.runtaskentries: 1128 fn = fn_from_tid(tid) 1129 if fn != provfn: 1130 continue 1131 for dep in self.runtaskentries[tid].revdeps: 1132 fn = fn_from_tid(dep) 1133 if fn == provfn: 1134 continue 1135 deps.add(dep) 1136 if not commondeps: 1137 commondeps = set(deps) 1138 else: 1139 commondeps &= deps 1140 deplist[provfn] = deps 1141 for provfn in deplist: 1142 msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) 1143 # 1144 # Construct a list of provides and runtime providers for each recipe 1145 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1146 # 1147 msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") 1148 provide_results = {} 1149 rprovide_results = {} 1150 commonprovs = None 1151 commonrprovs = None 1152 for provfn in prov_list[prov]: 1153 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1154 rprovides = set() 1155 for rprovide in self.dataCaches[mc].rproviders: 1156 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1157 rprovides.add(rprovide) 1158 for package in self.dataCaches[mc].packages: 1159 if provfn in self.dataCaches[mc].packages[package]: 1160 rprovides.add(package) 1161 for package in self.dataCaches[mc].packages_dynamic: 1162 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1163 rprovides.add(package) 1164 if not commonprovs: 1165 commonprovs = set(provides) 1166 else: 1167 commonprovs &= provides 1168 provide_results[provfn] = provides 1169 if not commonrprovs: 1170 commonrprovs = set(rprovides) 1171 else: 1172 commonrprovs &= rprovides 1173 rprovide_results[provfn] = rprovides 1174 #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) 1175 #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) 1176 for provfn in prov_list[prov]: 1177 msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) 1178 msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) 1179 1180 if self.warn_multi_bb: 1181 logger.verbnote("".join(msgs)) 1182 else: 1183 logger.error("".join(msgs)) 1184 1185 self.init_progress_reporter.next_stage() 1186 self.init_progress_reporter.next_stage() 1187 1188 # Iterate over the task list looking for tasks with a 'setscene' function 1189 self.runq_setscene_tids = set() 1190 if not self.cooker.configuration.nosetscene: 1191 for tid in self.runtaskentries: 1192 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1193 setscenetid = tid + "_setscene" 1194 if setscenetid not in taskData[mc].taskentries: 1195 continue 1196 self.runq_setscene_tids.add(tid) 1197 1198 self.init_progress_reporter.next_stage() 1199 1200 # Invalidate task if force mode active 1201 if self.cooker.configuration.force: 1202 for tid in self.target_tids: 1203 invalidate_task(tid, False) 1204 1205 # Invalidate task if invalidate mode active 1206 if self.cooker.configuration.invalidate_stamp: 1207 for tid in self.target_tids: 1208 fn = fn_from_tid(tid) 1209 for st in self.cooker.configuration.invalidate_stamp.split(','): 1210 if not st.startswith("do_"): 1211 st = "do_%s" % st 1212 invalidate_task(fn + ":" + st, True) 1213 1214 self.init_progress_reporter.next_stage() 1215 1216 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1217 for mc in taskData: 1218 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1219 virtpnmap = {} 1220 for v in virtmap: 1221 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1222 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1223 if hasattr(bb.parse.siggen, "tasks_resolved"): 1224 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1225 1226 self.init_progress_reporter.next_stage() 1227 1228 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1229 1230 # Iterate over the task list and call into the siggen code 1231 dealtwith = set() 1232 todeal = set(self.runtaskentries) 1233 while todeal: 1234 for tid in todeal.copy(): 1235 if not (self.runtaskentries[tid].depends - dealtwith): 1236 dealtwith.add(tid) 1237 todeal.remove(tid) 1238 self.prepare_task_hash(tid) 1239 1240 bb.parse.siggen.writeout_file_checksum_cache() 1241 1242 #self.dump_data() 1243 return len(self.runtaskentries) 1244 1245 def prepare_task_hash(self, tid): 1246 dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid)) 1247 bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc) 1248 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc) 1249 self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 1250 1251 def dump_data(self): 1252 """ 1253 Dump some debug information on the internal data structures 1254 """ 1255 logger.debug3("run_tasks:") 1256 for tid in self.runtaskentries: 1257 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1258 self.runtaskentries[tid].weight, 1259 self.runtaskentries[tid].depends, 1260 self.runtaskentries[tid].revdeps) 1261 1262class RunQueueWorker(): 1263 def __init__(self, process, pipe): 1264 self.process = process 1265 self.pipe = pipe 1266 1267class RunQueue: 1268 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1269 1270 self.cooker = cooker 1271 self.cfgData = cfgData 1272 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1273 1274 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1275 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1276 1277 self.state = runQueuePrepare 1278 1279 # For disk space monitor 1280 # Invoked at regular time intervals via the bitbake heartbeat event 1281 # while the build is running. We generate a unique name for the handler 1282 # here, just in case that there ever is more than one RunQueue instance, 1283 # start the handler when reaching runQueueSceneInit, and stop it when 1284 # done with the build. 1285 self.dm = monitordisk.diskMonitor(cfgData) 1286 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1287 self.dm_event_handler_registered = False 1288 self.rqexe = None 1289 self.worker = {} 1290 self.fakeworker = {} 1291 1292 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1293 logger.debug("Starting bitbake-worker") 1294 magic = "decafbad" 1295 if self.cooker.configuration.profile: 1296 magic = "decafbadbad" 1297 fakerootlogs = None 1298 if fakeroot: 1299 magic = magic + "beef" 1300 mcdata = self.cooker.databuilder.mcdata[mc] 1301 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1302 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1303 env = os.environ.copy() 1304 for key, value in (var.split('=') for var in fakerootenv): 1305 env[key] = value 1306 worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1307 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1308 else: 1309 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1310 bb.utils.nonblockingfd(worker.stdout) 1311 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1312 1313 workerdata = { 1314 "taskdeps" : self.rqdata.dataCaches[mc].task_deps, 1315 "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv, 1316 "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs, 1317 "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, 1318 "sigdata" : bb.parse.siggen.get_taskdata(), 1319 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1320 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1321 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1322 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1323 "prhost" : self.cooker.prhost, 1324 "buildname" : self.cfgData.getVar("BUILDNAME"), 1325 "date" : self.cfgData.getVar("DATE"), 1326 "time" : self.cfgData.getVar("TIME"), 1327 "hashservaddr" : self.cooker.hashservaddr, 1328 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1329 } 1330 1331 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") 1332 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") 1333 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 1334 worker.stdin.flush() 1335 1336 return RunQueueWorker(worker, workerpipe) 1337 1338 def _teardown_worker(self, worker): 1339 if not worker: 1340 return 1341 logger.debug("Teardown for bitbake-worker") 1342 try: 1343 worker.process.stdin.write(b"<quit></quit>") 1344 worker.process.stdin.flush() 1345 worker.process.stdin.close() 1346 except IOError: 1347 pass 1348 while worker.process.returncode is None: 1349 worker.pipe.read() 1350 worker.process.poll() 1351 while worker.pipe.read(): 1352 continue 1353 worker.pipe.close() 1354 1355 def start_worker(self): 1356 if self.worker: 1357 self.teardown_workers() 1358 self.teardown = False 1359 for mc in self.rqdata.dataCaches: 1360 self.worker[mc] = self._start_worker(mc) 1361 1362 def start_fakeworker(self, rqexec, mc): 1363 if not mc in self.fakeworker: 1364 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1365 1366 def teardown_workers(self): 1367 self.teardown = True 1368 for mc in self.worker: 1369 self._teardown_worker(self.worker[mc]) 1370 self.worker = {} 1371 for mc in self.fakeworker: 1372 self._teardown_worker(self.fakeworker[mc]) 1373 self.fakeworker = {} 1374 1375 def read_workers(self): 1376 for mc in self.worker: 1377 self.worker[mc].pipe.read() 1378 for mc in self.fakeworker: 1379 self.fakeworker[mc].pipe.read() 1380 1381 def active_fds(self): 1382 fds = [] 1383 for mc in self.worker: 1384 fds.append(self.worker[mc].pipe.input) 1385 for mc in self.fakeworker: 1386 fds.append(self.fakeworker[mc].pipe.input) 1387 return fds 1388 1389 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1390 def get_timestamp(f): 1391 try: 1392 if not os.access(f, os.F_OK): 1393 return None 1394 return os.stat(f)[stat.ST_MTIME] 1395 except: 1396 return None 1397 1398 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1399 if taskname is None: 1400 taskname = tn 1401 1402 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn) 1403 1404 # If the stamp is missing, it's not current 1405 if not os.access(stampfile, os.F_OK): 1406 logger.debug2("Stampfile %s not available", stampfile) 1407 return False 1408 # If it's a 'nostamp' task, it's not current 1409 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1410 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1411 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1412 return False 1413 1414 if taskname != "do_setscene" and taskname.endswith("_setscene"): 1415 return True 1416 1417 if cache is None: 1418 cache = {} 1419 1420 iscurrent = True 1421 t1 = get_timestamp(stampfile) 1422 for dep in self.rqdata.runtaskentries[tid].depends: 1423 if iscurrent: 1424 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1425 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2) 1426 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2) 1427 t2 = get_timestamp(stampfile2) 1428 t3 = get_timestamp(stampfile3) 1429 if t3 and not t2: 1430 continue 1431 if t3 and t3 > t2: 1432 continue 1433 if fn == fn2: 1434 if not t2: 1435 logger.debug2('Stampfile %s does not exist', stampfile2) 1436 iscurrent = False 1437 break 1438 if t1 < t2: 1439 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1440 iscurrent = False 1441 break 1442 if recurse and iscurrent: 1443 if dep in cache: 1444 iscurrent = cache[dep] 1445 if not iscurrent: 1446 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1447 else: 1448 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1449 cache[dep] = iscurrent 1450 if recurse: 1451 cache[tid] = iscurrent 1452 return iscurrent 1453 1454 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1455 valid = set() 1456 if self.hashvalidate: 1457 sq_data = {} 1458 sq_data['hash'] = {} 1459 sq_data['hashfn'] = {} 1460 sq_data['unihash'] = {} 1461 for tid in tocheck: 1462 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1463 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1464 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1465 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1466 1467 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1468 1469 return valid 1470 1471 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1472 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1473 1474 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1475 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1476 1477 return bb.utils.better_eval(call, locs) 1478 1479 def _execute_runqueue(self): 1480 """ 1481 Run the tasks in a queue prepared by rqdata.prepare() 1482 Upon failure, optionally try to recover the build using any alternate providers 1483 (if the halt on failure configuration option isn't set) 1484 """ 1485 1486 retval = True 1487 1488 if self.state is runQueuePrepare: 1489 # NOTE: if you add, remove or significantly refactor the stages of this 1490 # process then you should recalculate the weightings here. This is quite 1491 # easy to do - just change the next line temporarily to pass debug=True as 1492 # the last parameter and you'll get a printout of the weightings as well 1493 # as a map to the lines where next_stage() was called. Of course this isn't 1494 # critical, but it helps to keep the progress reporting accurate. 1495 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1496 "Initialising tasks", 1497 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1498 if self.rqdata.prepare() == 0: 1499 self.state = runQueueComplete 1500 else: 1501 self.state = runQueueSceneInit 1502 bb.parse.siggen.save_unitaskhashes() 1503 1504 if self.state is runQueueSceneInit: 1505 self.rqdata.init_progress_reporter.next_stage() 1506 1507 # we are ready to run, emit dependency info to any UI or class which 1508 # needs it 1509 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1510 self.rqdata.init_progress_reporter.next_stage() 1511 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1512 1513 if not self.dm_event_handler_registered: 1514 res = bb.event.register(self.dm_event_handler_name, 1515 lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1516 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1517 self.dm_event_handler_registered = True 1518 1519 dump = self.cooker.configuration.dump_signatures 1520 if dump: 1521 self.rqdata.init_progress_reporter.finish() 1522 if 'printdiff' in dump: 1523 invalidtasks = self.print_diffscenetasks() 1524 self.dump_signatures(dump) 1525 if 'printdiff' in dump: 1526 self.write_diffscenetasks(invalidtasks) 1527 self.state = runQueueComplete 1528 1529 if self.state is runQueueSceneInit: 1530 self.rqdata.init_progress_reporter.next_stage() 1531 self.start_worker() 1532 self.rqdata.init_progress_reporter.next_stage() 1533 self.rqexe = RunQueueExecute(self) 1534 1535 # If we don't have any setscene functions, skip execution 1536 if not self.rqdata.runq_setscene_tids: 1537 logger.info('No setscene tasks') 1538 for tid in self.rqdata.runtaskentries: 1539 if not self.rqdata.runtaskentries[tid].depends: 1540 self.rqexe.setbuildable(tid) 1541 self.rqexe.tasks_notcovered.add(tid) 1542 self.rqexe.sqdone = True 1543 logger.info('Executing Tasks') 1544 self.state = runQueueRunning 1545 1546 if self.state is runQueueRunning: 1547 retval = self.rqexe.execute() 1548 1549 if self.state is runQueueCleanUp: 1550 retval = self.rqexe.finish() 1551 1552 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1553 1554 if build_done and self.dm_event_handler_registered: 1555 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1556 self.dm_event_handler_registered = False 1557 1558 if build_done and self.rqexe: 1559 bb.parse.siggen.save_unitaskhashes() 1560 self.teardown_workers() 1561 if self.rqexe: 1562 if self.rqexe.stats.failed: 1563 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1564 else: 1565 # Let's avoid the word "failed" if nothing actually did 1566 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1567 1568 if self.state is runQueueFailed: 1569 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1570 1571 if self.state is runQueueComplete: 1572 # All done 1573 return False 1574 1575 # Loop 1576 return retval 1577 1578 def execute_runqueue(self): 1579 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1580 try: 1581 return self._execute_runqueue() 1582 except bb.runqueue.TaskFailure: 1583 raise 1584 except SystemExit: 1585 raise 1586 except bb.BBHandledException: 1587 try: 1588 self.teardown_workers() 1589 except: 1590 pass 1591 self.state = runQueueComplete 1592 raise 1593 except Exception as err: 1594 logger.exception("An uncaught exception occurred in runqueue") 1595 try: 1596 self.teardown_workers() 1597 except: 1598 pass 1599 self.state = runQueueComplete 1600 raise 1601 1602 def finish_runqueue(self, now = False): 1603 if not self.rqexe: 1604 self.state = runQueueComplete 1605 return 1606 1607 if now: 1608 self.rqexe.finish_now() 1609 else: 1610 self.rqexe.finish() 1611 1612 def rq_dump_sigfn(self, fn, options): 1613 bb_cache = bb.cache.NoCache(self.cooker.databuilder) 1614 mc = bb.runqueue.mc_from_tid(fn) 1615 the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn)) 1616 siggen = bb.parse.siggen 1617 dataCaches = self.rqdata.dataCaches 1618 siggen.dump_sigfn(fn, dataCaches, options) 1619 1620 def dump_signatures(self, options): 1621 fns = set() 1622 bb.note("Reparsing files to collect dependency data") 1623 1624 for tid in self.rqdata.runtaskentries: 1625 fn = fn_from_tid(tid) 1626 fns.add(fn) 1627 1628 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1629 # We cannot use the real multiprocessing.Pool easily due to some local data 1630 # that can't be pickled. This is a cheap multi-process solution. 1631 launched = [] 1632 while fns: 1633 if len(launched) < max_process: 1634 p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options)) 1635 p.start() 1636 launched.append(p) 1637 for q in launched: 1638 # The finished processes are joined when calling is_alive() 1639 if not q.is_alive(): 1640 launched.remove(q) 1641 for p in launched: 1642 p.join() 1643 1644 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1645 1646 return 1647 1648 def print_diffscenetasks(self): 1649 1650 noexec = [] 1651 tocheck = set() 1652 1653 for tid in self.rqdata.runtaskentries: 1654 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1655 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1656 1657 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1658 noexec.append(tid) 1659 continue 1660 1661 tocheck.add(tid) 1662 1663 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1664 1665 # Tasks which are both setscene and noexec never care about dependencies 1666 # We therefore find tasks which are setscene and noexec and mark their 1667 # unique dependencies as valid. 1668 for tid in noexec: 1669 if tid not in self.rqdata.runq_setscene_tids: 1670 continue 1671 for dep in self.rqdata.runtaskentries[tid].depends: 1672 hasnoexecparents = True 1673 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1674 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1675 continue 1676 hasnoexecparents = False 1677 break 1678 if hasnoexecparents: 1679 valid_new.add(dep) 1680 1681 invalidtasks = set() 1682 for tid in self.rqdata.runtaskentries: 1683 if tid not in valid_new and tid not in noexec: 1684 invalidtasks.add(tid) 1685 1686 found = set() 1687 processed = set() 1688 for tid in invalidtasks: 1689 toprocess = set([tid]) 1690 while toprocess: 1691 next = set() 1692 for t in toprocess: 1693 for dep in self.rqdata.runtaskentries[t].depends: 1694 if dep in invalidtasks: 1695 found.add(tid) 1696 if dep not in processed: 1697 processed.add(dep) 1698 next.add(dep) 1699 toprocess = next 1700 if tid in found: 1701 toprocess = set() 1702 1703 tasklist = [] 1704 for tid in invalidtasks.difference(found): 1705 tasklist.append(tid) 1706 1707 if tasklist: 1708 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1709 1710 return invalidtasks.difference(found) 1711 1712 def write_diffscenetasks(self, invalidtasks): 1713 1714 # Define recursion callback 1715 def recursecb(key, hash1, hash2): 1716 hashes = [hash1, hash2] 1717 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1718 1719 recout = [] 1720 if len(hashfiles) == 2: 1721 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) 1722 recout.extend(list(' ' + l for l in out2)) 1723 else: 1724 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1725 1726 return recout 1727 1728 1729 for tid in invalidtasks: 1730 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1731 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1732 h = self.rqdata.runtaskentries[tid].hash 1733 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) 1734 match = None 1735 for m in matches: 1736 if h in m: 1737 match = m 1738 if match is None: 1739 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) 1740 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1741 if matches: 1742 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] 1743 prevh = __find_sha256__.search(latestmatch).group(0) 1744 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1745 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1746 1747 1748class RunQueueExecute: 1749 1750 def __init__(self, rq): 1751 self.rq = rq 1752 self.cooker = rq.cooker 1753 self.cfgData = rq.cfgData 1754 self.rqdata = rq.rqdata 1755 1756 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1757 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1758 self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") 1759 self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") 1760 self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") 1761 1762 self.sq_buildable = set() 1763 self.sq_running = set() 1764 self.sq_live = set() 1765 1766 self.updated_taskhash_queue = [] 1767 self.pending_migrations = set() 1768 1769 self.runq_buildable = set() 1770 self.runq_running = set() 1771 self.runq_complete = set() 1772 self.runq_tasksrun = set() 1773 1774 self.build_stamps = {} 1775 self.build_stamps2 = [] 1776 self.failed_tids = [] 1777 self.sq_deferred = {} 1778 1779 self.stampcache = {} 1780 1781 self.holdoff_tasks = set() 1782 self.holdoff_need_update = True 1783 self.sqdone = False 1784 1785 self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) 1786 1787 for mc in rq.worker: 1788 rq.worker[mc].pipe.setrunqueueexec(self) 1789 for mc in rq.fakeworker: 1790 rq.fakeworker[mc].pipe.setrunqueueexec(self) 1791 1792 if self.number_tasks <= 0: 1793 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1794 1795 lower_limit = 1.0 1796 upper_limit = 1000000.0 1797 if self.max_cpu_pressure: 1798 self.max_cpu_pressure = float(self.max_cpu_pressure) 1799 if self.max_cpu_pressure < lower_limit: 1800 bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) 1801 if self.max_cpu_pressure > upper_limit: 1802 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) 1803 1804 if self.max_io_pressure: 1805 self.max_io_pressure = float(self.max_io_pressure) 1806 if self.max_io_pressure < lower_limit: 1807 bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) 1808 if self.max_io_pressure > upper_limit: 1809 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1810 1811 if self.max_memory_pressure: 1812 self.max_memory_pressure = float(self.max_memory_pressure) 1813 if self.max_memory_pressure < lower_limit: 1814 bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) 1815 if self.max_memory_pressure > upper_limit: 1816 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1817 1818 # List of setscene tasks which we've covered 1819 self.scenequeue_covered = set() 1820 # List of tasks which are covered (including setscene ones) 1821 self.tasks_covered = set() 1822 self.tasks_scenequeue_done = set() 1823 self.scenequeue_notcovered = set() 1824 self.tasks_notcovered = set() 1825 self.scenequeue_notneeded = set() 1826 1827 # We can't skip specified target tasks which aren't setscene tasks 1828 self.cantskip = set(self.rqdata.target_tids) 1829 self.cantskip.difference_update(self.rqdata.runq_setscene_tids) 1830 self.cantskip.intersection_update(self.rqdata.runtaskentries) 1831 1832 schedulers = self.get_schedulers() 1833 for scheduler in schedulers: 1834 if self.scheduler == scheduler.name: 1835 self.sched = scheduler(self, self.rqdata) 1836 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1837 break 1838 else: 1839 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1840 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1841 1842 #if self.rqdata.runq_setscene_tids: 1843 self.sqdata = SQData() 1844 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) 1845 1846 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1847 1848 # self.build_stamps[pid] may not exist when use shared work directory. 1849 if task in self.build_stamps: 1850 self.build_stamps2.remove(self.build_stamps[task]) 1851 del self.build_stamps[task] 1852 1853 if task in self.sq_live: 1854 if status != 0: 1855 self.sq_task_fail(task, status) 1856 else: 1857 self.sq_task_complete(task) 1858 self.sq_live.remove(task) 1859 self.stats.updateActiveSetscene(len(self.sq_live)) 1860 else: 1861 if status != 0: 1862 self.task_fail(task, status, fakerootlog=fakerootlog) 1863 else: 1864 self.task_complete(task) 1865 return True 1866 1867 def finish_now(self): 1868 for mc in self.rq.worker: 1869 try: 1870 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") 1871 self.rq.worker[mc].process.stdin.flush() 1872 except IOError: 1873 # worker must have died? 1874 pass 1875 for mc in self.rq.fakeworker: 1876 try: 1877 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") 1878 self.rq.fakeworker[mc].process.stdin.flush() 1879 except IOError: 1880 # worker must have died? 1881 pass 1882 1883 if self.failed_tids: 1884 self.rq.state = runQueueFailed 1885 return 1886 1887 self.rq.state = runQueueComplete 1888 return 1889 1890 def finish(self): 1891 self.rq.state = runQueueCleanUp 1892 1893 active = self.stats.active + len(self.sq_live) 1894 if active > 0: 1895 bb.event.fire(runQueueExitWait(active), self.cfgData) 1896 self.rq.read_workers() 1897 return self.rq.active_fds() 1898 1899 if self.failed_tids: 1900 self.rq.state = runQueueFailed 1901 return True 1902 1903 self.rq.state = runQueueComplete 1904 return True 1905 1906 # Used by setscene only 1907 def check_dependencies(self, task, taskdeps): 1908 if not self.rq.depvalidate: 1909 return False 1910 1911 # Must not edit parent data 1912 taskdeps = set(taskdeps) 1913 1914 taskdata = {} 1915 taskdeps.add(task) 1916 for dep in taskdeps: 1917 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 1918 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1919 taskdata[dep] = [pn, taskname, fn] 1920 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 1921 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 1922 valid = bb.utils.better_eval(call, locs) 1923 return valid 1924 1925 def can_start_task(self): 1926 active = self.stats.active + len(self.sq_live) 1927 can_start = active < self.number_tasks 1928 return can_start 1929 1930 def get_schedulers(self): 1931 schedulers = set(obj for obj in globals().values() 1932 if type(obj) is type and 1933 issubclass(obj, RunQueueScheduler)) 1934 1935 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 1936 if user_schedulers: 1937 for sched in user_schedulers.split(): 1938 if not "." in sched: 1939 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 1940 continue 1941 1942 modname, name = sched.rsplit(".", 1) 1943 try: 1944 module = __import__(modname, fromlist=(name,)) 1945 except ImportError as exc: 1946 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 1947 raise SystemExit(1) 1948 else: 1949 schedulers.add(getattr(module, name)) 1950 return schedulers 1951 1952 def setbuildable(self, task): 1953 self.runq_buildable.add(task) 1954 self.sched.newbuildable(task) 1955 1956 def task_completeoutright(self, task): 1957 """ 1958 Mark a task as completed 1959 Look at the reverse dependencies and mark any task with 1960 completed dependencies as buildable 1961 """ 1962 self.runq_complete.add(task) 1963 for revdep in self.rqdata.runtaskentries[task].revdeps: 1964 if revdep in self.runq_running: 1965 continue 1966 if revdep in self.runq_buildable: 1967 continue 1968 alldeps = True 1969 for dep in self.rqdata.runtaskentries[revdep].depends: 1970 if dep not in self.runq_complete: 1971 alldeps = False 1972 break 1973 if alldeps: 1974 self.setbuildable(revdep) 1975 logger.debug("Marking task %s as buildable", revdep) 1976 1977 for t in self.sq_deferred.copy(): 1978 if self.sq_deferred[t] == task: 1979 logger.debug2("Deferred task %s now buildable" % t) 1980 del self.sq_deferred[t] 1981 update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 1982 1983 def task_complete(self, task): 1984 self.stats.taskCompleted() 1985 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 1986 self.task_completeoutright(task) 1987 self.runq_tasksrun.add(task) 1988 1989 def task_fail(self, task, exitcode, fakerootlog=None): 1990 """ 1991 Called when a task has failed 1992 Updates the state engine with the failure 1993 """ 1994 self.stats.taskFailed() 1995 self.failed_tids.append(task) 1996 1997 fakeroot_log = [] 1998 if fakerootlog and os.path.exists(fakerootlog): 1999 with open(fakerootlog) as fakeroot_log_file: 2000 fakeroot_failed = False 2001 for line in reversed(fakeroot_log_file.readlines()): 2002 for fakeroot_error in ['mismatch', 'error', 'fatal']: 2003 if fakeroot_error in line.lower(): 2004 fakeroot_failed = True 2005 if 'doing new pid setup and server start' in line: 2006 break 2007 fakeroot_log.append(line) 2008 2009 if not fakeroot_failed: 2010 fakeroot_log = [] 2011 2012 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) 2013 2014 if self.rqdata.taskData[''].halt: 2015 self.rq.state = runQueueCleanUp 2016 2017 def task_skip(self, task, reason): 2018 self.runq_running.add(task) 2019 self.setbuildable(task) 2020 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 2021 self.task_completeoutright(task) 2022 self.stats.taskSkipped() 2023 self.stats.taskCompleted() 2024 2025 def summarise_scenequeue_errors(self): 2026 err = False 2027 if not self.sqdone: 2028 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2029 completeevent = sceneQueueComplete(self.stats, self.rq) 2030 bb.event.fire(completeevent, self.cfgData) 2031 if self.sq_deferred: 2032 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 2033 err = True 2034 if self.updated_taskhash_queue: 2035 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 2036 err = True 2037 if self.holdoff_tasks: 2038 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 2039 err = True 2040 2041 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 2042 # No task should end up in both covered and uncovered, that is a bug. 2043 logger.error("Setscene task %s in both covered and notcovered." % tid) 2044 2045 for tid in self.rqdata.runq_setscene_tids: 2046 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2047 err = True 2048 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 2049 if tid not in self.sq_buildable: 2050 err = True 2051 logger.error("Setscene Task %s was never marked as buildable" % tid) 2052 if tid not in self.sq_running: 2053 err = True 2054 logger.error("Setscene Task %s was never marked as running" % tid) 2055 2056 for x in self.rqdata.runtaskentries: 2057 if x not in self.tasks_covered and x not in self.tasks_notcovered: 2058 logger.error("Task %s was never moved from the setscene queue" % x) 2059 err = True 2060 if x not in self.tasks_scenequeue_done: 2061 logger.error("Task %s was never processed by the setscene code" % x) 2062 err = True 2063 if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: 2064 logger.error("Task %s was never marked as buildable by the setscene code" % x) 2065 err = True 2066 return err 2067 2068 2069 def execute(self): 2070 """ 2071 Run the tasks in a queue prepared by prepare_runqueue 2072 """ 2073 2074 self.rq.read_workers() 2075 if self.updated_taskhash_queue or self.pending_migrations: 2076 self.process_possible_migrations() 2077 2078 if not hasattr(self, "sorted_setscene_tids"): 2079 # Don't want to sort this set every execution 2080 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 2081 2082 task = None 2083 if not self.sqdone and self.can_start_task(): 2084 # Find the next setscene to run 2085 for nexttask in self.sorted_setscene_tids: 2086 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): 2087 if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2088 if nexttask not in self.rqdata.target_tids: 2089 logger.debug2("Skipping setscene for task %s" % nexttask) 2090 self.sq_task_skip(nexttask) 2091 self.scenequeue_notneeded.add(nexttask) 2092 if nexttask in self.sq_deferred: 2093 del self.sq_deferred[nexttask] 2094 return True 2095 # If covered tasks are running, need to wait for them to complete 2096 for t in self.sqdata.sq_covered_tasks[nexttask]: 2097 if t in self.runq_running and t not in self.runq_complete: 2098 continue 2099 if nexttask in self.sq_deferred: 2100 if self.sq_deferred[nexttask] not in self.runq_complete: 2101 continue 2102 logger.debug("Task %s no longer deferred" % nexttask) 2103 del self.sq_deferred[nexttask] 2104 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2105 if not valid: 2106 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2107 self.sq_task_failoutright(nexttask) 2108 return True 2109 if nexttask in self.sqdata.outrightfail: 2110 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2111 self.sq_task_failoutright(nexttask) 2112 return True 2113 if nexttask in self.sqdata.unskippable: 2114 logger.debug2("Setscene task %s is unskippable" % nexttask) 2115 task = nexttask 2116 break 2117 if task is not None: 2118 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2119 taskname = taskname + "_setscene" 2120 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2121 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2122 self.sq_task_failoutright(task) 2123 return True 2124 2125 if self.cooker.configuration.force: 2126 if task in self.rqdata.target_tids: 2127 self.sq_task_failoutright(task) 2128 return True 2129 2130 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2131 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2132 self.sq_task_skip(task) 2133 return True 2134 2135 if self.cooker.configuration.skipsetscene: 2136 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2137 self.sq_task_failoutright(task) 2138 return True 2139 2140 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 2141 bb.event.fire(startevent, self.cfgData) 2142 2143 taskdepdata = self.sq_build_taskdepdata(task) 2144 2145 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2146 taskhash = self.rqdata.get_task_hash(task) 2147 unihash = self.rqdata.get_task_unihash(task) 2148 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2149 if not mc in self.rq.fakeworker: 2150 self.rq.start_fakeworker(self, mc) 2151 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2152 self.rq.fakeworker[mc].process.stdin.flush() 2153 else: 2154 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2155 self.rq.worker[mc].process.stdin.flush() 2156 2157 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2158 self.build_stamps2.append(self.build_stamps[task]) 2159 self.sq_running.add(task) 2160 self.sq_live.add(task) 2161 self.stats.updateActiveSetscene(len(self.sq_live)) 2162 if self.can_start_task(): 2163 return True 2164 2165 self.update_holdofftasks() 2166 2167 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2168 hashequiv_logger.verbose("Setscene tasks completed") 2169 2170 err = self.summarise_scenequeue_errors() 2171 if err: 2172 self.rq.state = runQueueFailed 2173 return True 2174 2175 if self.cooker.configuration.setsceneonly: 2176 self.rq.state = runQueueComplete 2177 return True 2178 self.sqdone = True 2179 2180 if self.stats.total == 0: 2181 # nothing to do 2182 self.rq.state = runQueueComplete 2183 return True 2184 2185 if self.cooker.configuration.setsceneonly: 2186 task = None 2187 else: 2188 task = self.sched.next() 2189 if task is not None: 2190 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2191 2192 if self.rqdata.setscene_ignore_tasks is not None: 2193 if self.check_setscene_ignore_tasks(task): 2194 self.task_fail(task, "setscene ignore_tasks") 2195 return True 2196 2197 if task in self.tasks_covered: 2198 logger.debug2("Setscene covered task %s", task) 2199 self.task_skip(task, "covered") 2200 return True 2201 2202 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2203 logger.debug2("Stamp current task %s", task) 2204 2205 self.task_skip(task, "existing") 2206 self.runq_tasksrun.add(task) 2207 return True 2208 2209 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2210 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2211 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2212 noexec=True) 2213 bb.event.fire(startevent, self.cfgData) 2214 self.runq_running.add(task) 2215 self.stats.taskActive() 2216 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2217 bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn) 2218 self.task_complete(task) 2219 return True 2220 else: 2221 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2222 bb.event.fire(startevent, self.cfgData) 2223 2224 taskdepdata = self.build_taskdepdata(task) 2225 2226 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2227 taskhash = self.rqdata.get_task_hash(task) 2228 unihash = self.rqdata.get_task_unihash(task) 2229 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2230 if not mc in self.rq.fakeworker: 2231 try: 2232 self.rq.start_fakeworker(self, mc) 2233 except OSError as exc: 2234 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2235 self.rq.state = runQueueFailed 2236 self.stats.taskFailed() 2237 return True 2238 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2239 self.rq.fakeworker[mc].process.stdin.flush() 2240 else: 2241 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2242 self.rq.worker[mc].process.stdin.flush() 2243 2244 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2245 self.build_stamps2.append(self.build_stamps[task]) 2246 self.runq_running.add(task) 2247 self.stats.taskActive() 2248 if self.can_start_task(): 2249 return True 2250 2251 if self.stats.active > 0 or self.sq_live: 2252 self.rq.read_workers() 2253 return self.rq.active_fds() 2254 2255 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2256 if self.sq_deferred: 2257 deferred_tid = list(self.sq_deferred.keys())[0] 2258 blocking_tid = self.sq_deferred.pop(deferred_tid) 2259 logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid)) 2260 return True 2261 2262 if self.failed_tids: 2263 self.rq.state = runQueueFailed 2264 return True 2265 2266 # Sanity Checks 2267 err = self.summarise_scenequeue_errors() 2268 for task in self.rqdata.runtaskentries: 2269 if task not in self.runq_buildable: 2270 logger.error("Task %s never buildable!", task) 2271 err = True 2272 elif task not in self.runq_running: 2273 logger.error("Task %s never ran!", task) 2274 err = True 2275 elif task not in self.runq_complete: 2276 logger.error("Task %s never completed!", task) 2277 err = True 2278 2279 if err: 2280 self.rq.state = runQueueFailed 2281 else: 2282 self.rq.state = runQueueComplete 2283 2284 return True 2285 2286 def filtermcdeps(self, task, mc, deps): 2287 ret = set() 2288 for dep in deps: 2289 thismc = mc_from_tid(dep) 2290 if thismc != mc: 2291 continue 2292 ret.add(dep) 2293 return ret 2294 2295 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2296 # as most code can't handle them 2297 def build_taskdepdata(self, task): 2298 taskdepdata = {} 2299 mc = mc_from_tid(task) 2300 next = self.rqdata.runtaskentries[task].depends.copy() 2301 next.add(task) 2302 next = self.filtermcdeps(task, mc, next) 2303 while next: 2304 additional = [] 2305 for revdep in next: 2306 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2307 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2308 deps = self.rqdata.runtaskentries[revdep].depends 2309 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2310 taskhash = self.rqdata.runtaskentries[revdep].hash 2311 unihash = self.rqdata.runtaskentries[revdep].unihash 2312 deps = self.filtermcdeps(task, mc, deps) 2313 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2314 for revdep2 in deps: 2315 if revdep2 not in taskdepdata: 2316 additional.append(revdep2) 2317 next = additional 2318 2319 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2320 return taskdepdata 2321 2322 def update_holdofftasks(self): 2323 2324 if not self.holdoff_need_update: 2325 return 2326 2327 notcovered = set(self.scenequeue_notcovered) 2328 notcovered |= self.cantskip 2329 for tid in self.scenequeue_notcovered: 2330 notcovered |= self.sqdata.sq_covered_tasks[tid] 2331 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2332 notcovered.intersection_update(self.tasks_scenequeue_done) 2333 2334 covered = set(self.scenequeue_covered) 2335 for tid in self.scenequeue_covered: 2336 covered |= self.sqdata.sq_covered_tasks[tid] 2337 covered.difference_update(notcovered) 2338 covered.intersection_update(self.tasks_scenequeue_done) 2339 2340 for tid in notcovered | covered: 2341 if not self.rqdata.runtaskentries[tid].depends: 2342 self.setbuildable(tid) 2343 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2344 self.setbuildable(tid) 2345 2346 self.tasks_covered = covered 2347 self.tasks_notcovered = notcovered 2348 2349 self.holdoff_tasks = set() 2350 2351 for tid in self.rqdata.runq_setscene_tids: 2352 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2353 self.holdoff_tasks.add(tid) 2354 2355 for tid in self.holdoff_tasks.copy(): 2356 for dep in self.sqdata.sq_covered_tasks[tid]: 2357 if dep not in self.runq_complete: 2358 self.holdoff_tasks.add(dep) 2359 2360 self.holdoff_need_update = False 2361 2362 def process_possible_migrations(self): 2363 2364 changed = set() 2365 toprocess = set() 2366 for tid, unihash in self.updated_taskhash_queue.copy(): 2367 if tid in self.runq_running and tid not in self.runq_complete: 2368 continue 2369 2370 self.updated_taskhash_queue.remove((tid, unihash)) 2371 2372 if unihash != self.rqdata.runtaskentries[tid].unihash: 2373 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2374 torehash = [tid] 2375 for deftid in self.sq_deferred: 2376 if self.sq_deferred[deftid] == tid: 2377 torehash.append(deftid) 2378 for hashtid in torehash: 2379 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2380 self.rqdata.runtaskentries[hashtid].unihash = unihash 2381 bb.parse.siggen.set_unihash(hashtid, unihash) 2382 toprocess.add(hashtid) 2383 if torehash: 2384 # Need to save after set_unihash above 2385 bb.parse.siggen.save_unitaskhashes() 2386 2387 # Work out all tasks which depend upon these 2388 total = set() 2389 next = set() 2390 for p in toprocess: 2391 next |= self.rqdata.runtaskentries[p].revdeps 2392 while next: 2393 current = next.copy() 2394 total = total | next 2395 next = set() 2396 for ntid in current: 2397 next |= self.rqdata.runtaskentries[ntid].revdeps 2398 next.difference_update(total) 2399 2400 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2401 next = set() 2402 for p in total: 2403 if not self.rqdata.runtaskentries[p].depends: 2404 next.add(p) 2405 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2406 next.add(p) 2407 2408 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2409 while next: 2410 current = next.copy() 2411 next = set() 2412 for tid in current: 2413 if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2414 continue 2415 orighash = self.rqdata.runtaskentries[tid].hash 2416 dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid)) 2417 newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc) 2418 origuni = self.rqdata.runtaskentries[tid].unihash 2419 newuni = bb.parse.siggen.get_unihash(tid) 2420 # FIXME, need to check it can come from sstate at all for determinism? 2421 remapped = False 2422 if newuni == origuni: 2423 # Nothing to do, we match, skip code below 2424 remapped = True 2425 elif tid in self.scenequeue_covered or tid in self.sq_live: 2426 # Already ran this setscene task or it running. Report the new taskhash 2427 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2428 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2429 remapped = True 2430 2431 if not remapped: 2432 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2433 self.rqdata.runtaskentries[tid].hash = newhash 2434 self.rqdata.runtaskentries[tid].unihash = newuni 2435 changed.add(tid) 2436 2437 next |= self.rqdata.runtaskentries[tid].revdeps 2438 total.remove(tid) 2439 next.intersection_update(total) 2440 2441 if changed: 2442 for mc in self.rq.worker: 2443 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2444 for mc in self.rq.fakeworker: 2445 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2446 2447 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2448 2449 for tid in changed: 2450 if tid not in self.rqdata.runq_setscene_tids: 2451 continue 2452 if tid not in self.pending_migrations: 2453 self.pending_migrations.add(tid) 2454 2455 update_tasks = [] 2456 for tid in self.pending_migrations.copy(): 2457 if tid in self.runq_running or tid in self.sq_live: 2458 # Too late, task already running, not much we can do now 2459 self.pending_migrations.remove(tid) 2460 continue 2461 2462 valid = True 2463 # Check no tasks this covers are running 2464 for dep in self.sqdata.sq_covered_tasks[tid]: 2465 if dep in self.runq_running and dep not in self.runq_complete: 2466 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2467 valid = False 2468 break 2469 if not valid: 2470 continue 2471 2472 self.pending_migrations.remove(tid) 2473 changed = True 2474 2475 if tid in self.tasks_scenequeue_done: 2476 self.tasks_scenequeue_done.remove(tid) 2477 for dep in self.sqdata.sq_covered_tasks[tid]: 2478 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2479 bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) 2480 self.failed_tids.append(tid) 2481 self.rq.state = runQueueCleanUp 2482 return 2483 2484 if dep not in self.runq_complete: 2485 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2486 self.tasks_scenequeue_done.remove(dep) 2487 2488 if tid in self.sq_buildable: 2489 self.sq_buildable.remove(tid) 2490 if tid in self.sq_running: 2491 self.sq_running.remove(tid) 2492 harddepfail = False 2493 for t in self.sqdata.sq_harddeps: 2494 if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered: 2495 harddepfail = True 2496 break 2497 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2498 if tid not in self.sq_buildable: 2499 self.sq_buildable.add(tid) 2500 if not self.sqdata.sq_revdeps[tid]: 2501 self.sq_buildable.add(tid) 2502 2503 if tid in self.sqdata.outrightfail: 2504 self.sqdata.outrightfail.remove(tid) 2505 if tid in self.scenequeue_notcovered: 2506 self.scenequeue_notcovered.remove(tid) 2507 if tid in self.scenequeue_covered: 2508 self.scenequeue_covered.remove(tid) 2509 if tid in self.scenequeue_notneeded: 2510 self.scenequeue_notneeded.remove(tid) 2511 2512 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2513 self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) 2514 2515 if tid in self.stampcache: 2516 del self.stampcache[tid] 2517 2518 if tid in self.build_stamps: 2519 del self.build_stamps[tid] 2520 2521 update_tasks.append((tid, harddepfail, tid in self.sqdata.valid)) 2522 2523 if update_tasks: 2524 self.sqdone = False 2525 for mc in sorted(self.sqdata.multiconfigs): 2526 for tid in sorted([t[0] for t in update_tasks]): 2527 if mc_from_tid(tid) != mc: 2528 continue 2529 h = pending_hash_index(tid, self.rqdata) 2530 if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: 2531 self.sq_deferred[tid] = self.sqdata.hashes[h] 2532 bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) 2533 update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2534 2535 for (tid, harddepfail, origvalid) in update_tasks: 2536 if tid in self.sqdata.valid and not origvalid: 2537 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2538 if harddepfail: 2539 self.sq_task_failoutright(tid) 2540 2541 if changed: 2542 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2543 self.holdoff_need_update = True 2544 2545 def scenequeue_updatecounters(self, task, fail=False): 2546 2547 for dep in sorted(self.sqdata.sq_deps[task]): 2548 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: 2549 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2550 # dependency could be already processed, e.g. noexec setscene task 2551 continue 2552 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2553 if noexec or stamppresent: 2554 continue 2555 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2556 self.sq_task_failoutright(dep) 2557 continue 2558 if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2559 if dep not in self.sq_buildable: 2560 self.sq_buildable.add(dep) 2561 2562 next = set([task]) 2563 while next: 2564 new = set() 2565 for t in sorted(next): 2566 self.tasks_scenequeue_done.add(t) 2567 # Look down the dependency chain for non-setscene things which this task depends on 2568 # and mark as 'done' 2569 for dep in self.rqdata.runtaskentries[t].depends: 2570 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2571 continue 2572 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2573 new.add(dep) 2574 next = new 2575 2576 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2577 self.holdoff_need_update = True 2578 2579 def sq_task_completeoutright(self, task): 2580 """ 2581 Mark a task as completed 2582 Look at the reverse dependencies and mark any task with 2583 completed dependencies as buildable 2584 """ 2585 2586 logger.debug('Found task %s which could be accelerated', task) 2587 self.scenequeue_covered.add(task) 2588 self.scenequeue_updatecounters(task) 2589 2590 def sq_check_taskfail(self, task): 2591 if self.rqdata.setscene_ignore_tasks is not None: 2592 realtask = task.split('_setscene')[0] 2593 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2594 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2595 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2596 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2597 self.rq.state = runQueueCleanUp 2598 2599 def sq_task_complete(self, task): 2600 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2601 self.sq_task_completeoutright(task) 2602 2603 def sq_task_fail(self, task, result): 2604 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) 2605 self.scenequeue_notcovered.add(task) 2606 self.scenequeue_updatecounters(task, True) 2607 self.sq_check_taskfail(task) 2608 2609 def sq_task_failoutright(self, task): 2610 self.sq_running.add(task) 2611 self.sq_buildable.add(task) 2612 self.scenequeue_notcovered.add(task) 2613 self.scenequeue_updatecounters(task, True) 2614 2615 def sq_task_skip(self, task): 2616 self.sq_running.add(task) 2617 self.sq_buildable.add(task) 2618 self.sq_task_completeoutright(task) 2619 2620 def sq_build_taskdepdata(self, task): 2621 def getsetscenedeps(tid): 2622 deps = set() 2623 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2624 realtid = tid + "_setscene" 2625 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2626 for (depname, idependtask) in idepends: 2627 if depname not in self.rqdata.taskData[mc].build_targets: 2628 continue 2629 2630 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2631 if depfn is None: 2632 continue 2633 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2634 deps.add(deptid) 2635 return deps 2636 2637 taskdepdata = {} 2638 next = getsetscenedeps(task) 2639 next.add(task) 2640 while next: 2641 additional = [] 2642 for revdep in next: 2643 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2644 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2645 deps = getsetscenedeps(revdep) 2646 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2647 taskhash = self.rqdata.runtaskentries[revdep].hash 2648 unihash = self.rqdata.runtaskentries[revdep].unihash 2649 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2650 for revdep2 in deps: 2651 if revdep2 not in taskdepdata: 2652 additional.append(revdep2) 2653 next = additional 2654 2655 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2656 return taskdepdata 2657 2658 def check_setscene_ignore_tasks(self, tid): 2659 # Check task that is going to run against the ignore tasks list 2660 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2661 # Ignore covered tasks 2662 if tid in self.tasks_covered: 2663 return False 2664 # Ignore stamped tasks 2665 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2666 return False 2667 # Ignore noexec tasks 2668 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2669 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2670 return False 2671 2672 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2673 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2674 if tid in self.rqdata.runq_setscene_tids: 2675 msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] 2676 else: 2677 msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] 2678 for t in self.scenequeue_notcovered: 2679 msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) 2680 msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2681 logger.error("".join(msg)) 2682 return True 2683 return False 2684 2685class SQData(object): 2686 def __init__(self): 2687 # SceneQueue dependencies 2688 self.sq_deps = {} 2689 # SceneQueue reverse dependencies 2690 self.sq_revdeps = {} 2691 # Injected inter-setscene task dependencies 2692 self.sq_harddeps = {} 2693 # Cache of stamp files so duplicates can't run in parallel 2694 self.stamps = {} 2695 # Setscene tasks directly depended upon by the build 2696 self.unskippable = set() 2697 # List of setscene tasks which aren't present 2698 self.outrightfail = set() 2699 # A list of normal tasks a setscene task covers 2700 self.sq_covered_tasks = {} 2701 2702def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): 2703 2704 sq_revdeps = {} 2705 sq_revdeps_squash = {} 2706 sq_collated_deps = {} 2707 2708 # We need to construct a dependency graph for the setscene functions. Intermediate 2709 # dependencies between the setscene tasks only complicate the code. This code 2710 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2711 # only containing the setscene functions. 2712 2713 rqdata.init_progress_reporter.next_stage() 2714 2715 # First process the chains up to the first setscene task. 2716 endpoints = {} 2717 for tid in rqdata.runtaskentries: 2718 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2719 sq_revdeps_squash[tid] = set() 2720 if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: 2721 #bb.warn("Added endpoint %s" % (tid)) 2722 endpoints[tid] = set() 2723 2724 rqdata.init_progress_reporter.next_stage() 2725 2726 # Secondly process the chains between setscene tasks. 2727 for tid in rqdata.runq_setscene_tids: 2728 sq_collated_deps[tid] = set() 2729 #bb.warn("Added endpoint 2 %s" % (tid)) 2730 for dep in rqdata.runtaskentries[tid].depends: 2731 if tid in sq_revdeps[dep]: 2732 sq_revdeps[dep].remove(tid) 2733 if dep not in endpoints: 2734 endpoints[dep] = set() 2735 #bb.warn(" Added endpoint 3 %s" % (dep)) 2736 endpoints[dep].add(tid) 2737 2738 rqdata.init_progress_reporter.next_stage() 2739 2740 def process_endpoints(endpoints): 2741 newendpoints = {} 2742 for point, task in endpoints.items(): 2743 tasks = set() 2744 if task: 2745 tasks |= task 2746 if sq_revdeps_squash[point]: 2747 tasks |= sq_revdeps_squash[point] 2748 if point not in rqdata.runq_setscene_tids: 2749 for t in tasks: 2750 sq_collated_deps[t].add(point) 2751 sq_revdeps_squash[point] = set() 2752 if point in rqdata.runq_setscene_tids: 2753 sq_revdeps_squash[point] = tasks 2754 continue 2755 for dep in rqdata.runtaskentries[point].depends: 2756 if point in sq_revdeps[dep]: 2757 sq_revdeps[dep].remove(point) 2758 if tasks: 2759 sq_revdeps_squash[dep] |= tasks 2760 if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: 2761 newendpoints[dep] = task 2762 if newendpoints: 2763 process_endpoints(newendpoints) 2764 2765 process_endpoints(endpoints) 2766 2767 rqdata.init_progress_reporter.next_stage() 2768 2769 # Build a list of tasks which are "unskippable" 2770 # These are direct endpoints referenced by the build upto and including setscene tasks 2771 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2772 new = True 2773 for tid in rqdata.runtaskentries: 2774 if not rqdata.runtaskentries[tid].revdeps: 2775 sqdata.unskippable.add(tid) 2776 sqdata.unskippable |= sqrq.cantskip 2777 while new: 2778 new = False 2779 orig = sqdata.unskippable.copy() 2780 for tid in sorted(orig, reverse=True): 2781 if tid in rqdata.runq_setscene_tids: 2782 continue 2783 if not rqdata.runtaskentries[tid].depends: 2784 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2785 sqrq.setbuildable(tid) 2786 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2787 if sqdata.unskippable != orig: 2788 new = True 2789 2790 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 2791 2792 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2793 2794 # Sanity check all dependencies could be changed to setscene task references 2795 for taskcounter, tid in enumerate(rqdata.runtaskentries): 2796 if tid in rqdata.runq_setscene_tids: 2797 pass 2798 elif sq_revdeps_squash[tid]: 2799 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") 2800 else: 2801 del sq_revdeps_squash[tid] 2802 rqdata.init_progress_reporter.update(taskcounter) 2803 2804 rqdata.init_progress_reporter.next_stage() 2805 2806 # Resolve setscene inter-task dependencies 2807 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 2808 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 2809 for tid in rqdata.runq_setscene_tids: 2810 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2811 realtid = tid + "_setscene" 2812 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 2813 sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True) 2814 for (depname, idependtask) in idepends: 2815 2816 if depname not in rqdata.taskData[mc].build_targets: 2817 continue 2818 2819 depfn = rqdata.taskData[mc].build_targets[depname][0] 2820 if depfn is None: 2821 continue 2822 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2823 if deptid not in rqdata.runtaskentries: 2824 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 2825 2826 if not deptid in sqdata.sq_harddeps: 2827 sqdata.sq_harddeps[deptid] = set() 2828 sqdata.sq_harddeps[deptid].add(tid) 2829 2830 sq_revdeps_squash[tid].add(deptid) 2831 # Have to zero this to avoid circular dependencies 2832 sq_revdeps_squash[deptid] = set() 2833 2834 rqdata.init_progress_reporter.next_stage() 2835 2836 for task in sqdata.sq_harddeps: 2837 for dep in sqdata.sq_harddeps[task]: 2838 sq_revdeps_squash[dep].add(task) 2839 2840 rqdata.init_progress_reporter.next_stage() 2841 2842 #for tid in sq_revdeps_squash: 2843 # data = "" 2844 # for dep in sq_revdeps_squash[tid]: 2845 # data = data + "\n %s" % dep 2846 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 2847 2848 sqdata.sq_revdeps = sq_revdeps_squash 2849 sqdata.sq_covered_tasks = sq_collated_deps 2850 2851 # Build reverse version of revdeps to populate deps structure 2852 for tid in sqdata.sq_revdeps: 2853 sqdata.sq_deps[tid] = set() 2854 for tid in sqdata.sq_revdeps: 2855 for dep in sqdata.sq_revdeps[tid]: 2856 sqdata.sq_deps[dep].add(tid) 2857 2858 rqdata.init_progress_reporter.next_stage() 2859 2860 sqdata.multiconfigs = set() 2861 for tid in sqdata.sq_revdeps: 2862 sqdata.multiconfigs.add(mc_from_tid(tid)) 2863 if not sqdata.sq_revdeps[tid]: 2864 sqrq.sq_buildable.add(tid) 2865 2866 rqdata.init_progress_reporter.finish() 2867 2868 sqdata.noexec = set() 2869 sqdata.stamppresent = set() 2870 sqdata.valid = set() 2871 2872 sqdata.hashes = {} 2873 sqrq.sq_deferred = {} 2874 for mc in sorted(sqdata.multiconfigs): 2875 for tid in sorted(sqdata.sq_revdeps): 2876 if mc_from_tid(tid) != mc: 2877 continue 2878 h = pending_hash_index(tid, rqdata) 2879 if h not in sqdata.hashes: 2880 sqdata.hashes[h] = tid 2881 else: 2882 sqrq.sq_deferred[tid] = sqdata.hashes[h] 2883 bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) 2884 2885 update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) 2886 2887 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 2888 # in any stamp files. Pass the list out to metadata as an event. 2889 found = {} 2890 for tid in rqdata.runq_setscene_tids: 2891 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2892 stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn) 2893 if stamps: 2894 if mc not in found: 2895 found[mc] = {} 2896 found[mc][tid] = stamps 2897 for mc in found: 2898 event = bb.event.StaleSetSceneTasks(found[mc]) 2899 bb.event.fire(event, cooker.databuilder.mcdata[mc]) 2900 2901def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 2902 2903 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2904 2905 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 2906 2907 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2908 bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn) 2909 return True, False 2910 2911 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 2912 logger.debug2('Setscene stamp current for task %s', tid) 2913 return False, True 2914 2915 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 2916 logger.debug2('Normal stamp current for task %s', tid) 2917 return False, True 2918 2919 return False, False 2920 2921def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 2922 2923 tocheck = set() 2924 2925 for tid in sorted(tids): 2926 if tid in sqdata.stamppresent: 2927 sqdata.stamppresent.remove(tid) 2928 if tid in sqdata.valid: 2929 sqdata.valid.remove(tid) 2930 if tid in sqdata.outrightfail: 2931 sqdata.outrightfail.remove(tid) 2932 2933 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 2934 2935 if noexec: 2936 sqdata.noexec.add(tid) 2937 sqrq.sq_task_skip(tid) 2938 continue 2939 2940 if stamppresent: 2941 sqdata.stamppresent.add(tid) 2942 sqrq.sq_task_skip(tid) 2943 continue 2944 2945 tocheck.add(tid) 2946 2947 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 2948 2949 for tid in tids: 2950 if tid in sqdata.stamppresent: 2951 continue 2952 if tid in sqdata.valid: 2953 continue 2954 if tid in sqdata.noexec: 2955 continue 2956 if tid in sqrq.scenequeue_covered: 2957 continue 2958 if tid in sqrq.scenequeue_notcovered: 2959 continue 2960 if tid in sqrq.sq_deferred: 2961 continue 2962 sqdata.outrightfail.add(tid) 2963 2964class TaskFailure(Exception): 2965 """ 2966 Exception raised when a task in a runqueue fails 2967 """ 2968 def __init__(self, x): 2969 self.args = x 2970 2971 2972class runQueueExitWait(bb.event.Event): 2973 """ 2974 Event when waiting for task processes to exit 2975 """ 2976 2977 def __init__(self, remain): 2978 self.remain = remain 2979 self.message = "Waiting for %s active tasks to finish" % remain 2980 bb.event.Event.__init__(self) 2981 2982class runQueueEvent(bb.event.Event): 2983 """ 2984 Base runQueue event class 2985 """ 2986 def __init__(self, task, stats, rq): 2987 self.taskid = task 2988 self.taskstring = task 2989 self.taskname = taskname_from_tid(task) 2990 self.taskfile = fn_from_tid(task) 2991 self.taskhash = rq.rqdata.get_task_hash(task) 2992 self.stats = stats.copy() 2993 bb.event.Event.__init__(self) 2994 2995class sceneQueueEvent(runQueueEvent): 2996 """ 2997 Base sceneQueue event class 2998 """ 2999 def __init__(self, task, stats, rq, noexec=False): 3000 runQueueEvent.__init__(self, task, stats, rq) 3001 self.taskstring = task + "_setscene" 3002 self.taskname = taskname_from_tid(task) + "_setscene" 3003 self.taskfile = fn_from_tid(task) 3004 self.taskhash = rq.rqdata.get_task_hash(task) 3005 3006class runQueueTaskStarted(runQueueEvent): 3007 """ 3008 Event notifying a task was started 3009 """ 3010 def __init__(self, task, stats, rq, noexec=False): 3011 runQueueEvent.__init__(self, task, stats, rq) 3012 self.noexec = noexec 3013 3014class sceneQueueTaskStarted(sceneQueueEvent): 3015 """ 3016 Event notifying a setscene task was started 3017 """ 3018 def __init__(self, task, stats, rq, noexec=False): 3019 sceneQueueEvent.__init__(self, task, stats, rq) 3020 self.noexec = noexec 3021 3022class runQueueTaskFailed(runQueueEvent): 3023 """ 3024 Event notifying a task failed 3025 """ 3026 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 3027 runQueueEvent.__init__(self, task, stats, rq) 3028 self.exitcode = exitcode 3029 self.fakeroot_log = fakeroot_log 3030 3031 def __str__(self): 3032 if self.fakeroot_log: 3033 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 3034 else: 3035 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 3036 3037class sceneQueueTaskFailed(sceneQueueEvent): 3038 """ 3039 Event notifying a setscene task failed 3040 """ 3041 def __init__(self, task, stats, exitcode, rq): 3042 sceneQueueEvent.__init__(self, task, stats, rq) 3043 self.exitcode = exitcode 3044 3045 def __str__(self): 3046 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 3047 3048class sceneQueueComplete(sceneQueueEvent): 3049 """ 3050 Event when all the sceneQueue tasks are complete 3051 """ 3052 def __init__(self, stats, rq): 3053 self.stats = stats.copy() 3054 bb.event.Event.__init__(self) 3055 3056class runQueueTaskCompleted(runQueueEvent): 3057 """ 3058 Event notifying a task completed 3059 """ 3060 3061class sceneQueueTaskCompleted(sceneQueueEvent): 3062 """ 3063 Event notifying a setscene task completed 3064 """ 3065 3066class runQueueTaskSkipped(runQueueEvent): 3067 """ 3068 Event notifying a task was skipped 3069 """ 3070 def __init__(self, task, stats, rq, reason): 3071 runQueueEvent.__init__(self, task, stats, rq) 3072 self.reason = reason 3073 3074class taskUniHashUpdate(bb.event.Event): 3075 """ 3076 Base runQueue event class 3077 """ 3078 def __init__(self, task, unihash): 3079 self.taskid = task 3080 self.unihash = unihash 3081 bb.event.Event.__init__(self) 3082 3083class runQueuePipe(): 3084 """ 3085 Abstraction for a pipe between a worker thread and the server 3086 """ 3087 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3088 self.input = pipein 3089 if pipeout: 3090 pipeout.close() 3091 bb.utils.nonblockingfd(self.input) 3092 self.queue = b"" 3093 self.d = d 3094 self.rq = rq 3095 self.rqexec = rqexec 3096 self.fakerootlogs = fakerootlogs 3097 3098 def setrunqueueexec(self, rqexec): 3099 self.rqexec = rqexec 3100 3101 def read(self): 3102 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3103 for worker in workers.values(): 3104 worker.process.poll() 3105 if worker.process.returncode is not None and not self.rq.teardown: 3106 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3107 self.rq.finish_runqueue(True) 3108 3109 start = len(self.queue) 3110 try: 3111 self.queue = self.queue + (self.input.read(102400) or b"") 3112 except (OSError, IOError) as e: 3113 if e.errno != errno.EAGAIN: 3114 raise 3115 end = len(self.queue) 3116 found = True 3117 while found and self.queue: 3118 found = False 3119 index = self.queue.find(b"</event>") 3120 while index != -1 and self.queue.startswith(b"<event>"): 3121 try: 3122 event = pickle.loads(self.queue[7:index]) 3123 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3124 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3125 # The pickled data could contain "</event>" so search for the next occurance 3126 # unpickling again, this should be the only way an unpickle error could occur 3127 index = self.queue.find(b"</event>", index + 1) 3128 continue 3129 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3130 bb.event.fire_from_worker(event, self.d) 3131 if isinstance(event, taskUniHashUpdate): 3132 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3133 found = True 3134 self.queue = self.queue[index+8:] 3135 index = self.queue.find(b"</event>") 3136 index = self.queue.find(b"</exitcode>") 3137 while index != -1 and self.queue.startswith(b"<exitcode>"): 3138 try: 3139 task, status = pickle.loads(self.queue[10:index]) 3140 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3141 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3142 (_, _, _, taskfn) = split_tid_mcfn(task) 3143 fakerootlog = None 3144 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3145 fakerootlog = self.fakerootlogs[taskfn] 3146 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3147 found = True 3148 self.queue = self.queue[index+11:] 3149 index = self.queue.find(b"</exitcode>") 3150 return (end > start) 3151 3152 def close(self): 3153 while self.read(): 3154 continue 3155 if self.queue: 3156 print("Warning, worker left partial message: %s" % self.queue) 3157 self.input.close() 3158 3159def get_setscene_enforce_ignore_tasks(d, targets): 3160 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3161 return None 3162 ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() 3163 outlist = [] 3164 for item in ignore_tasks[:]: 3165 if item.startswith('%:'): 3166 for (mc, target, task, fn) in targets: 3167 outlist.append(target + ':' + item.split(':')[1]) 3168 else: 3169 outlist.append(item) 3170 return outlist 3171 3172def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): 3173 import fnmatch 3174 if ignore_tasks is not None: 3175 item = '%s:%s' % (pn, taskname) 3176 for ignore_tasks in ignore_tasks: 3177 if fnmatch.fnmatch(item, ignore_tasks): 3178 return True 3179 return False 3180 return True 3181