1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import logging 18import re 19import bb 20from bb import msg, event 21from bb import monitordisk 22import subprocess 23import pickle 24from multiprocessing import Process 25import shlex 26import pprint 27 28bblogger = logging.getLogger("BitBake") 29logger = logging.getLogger("BitBake.RunQueue") 30hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 31 32__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 33 34def fn_from_tid(tid): 35 return tid.rsplit(":", 1)[0] 36 37def taskname_from_tid(tid): 38 return tid.rsplit(":", 1)[1] 39 40def mc_from_tid(tid): 41 if tid.startswith('mc:'): 42 return tid.split(':')[1] 43 return "" 44 45def split_tid(tid): 46 (mc, fn, taskname, _) = split_tid_mcfn(tid) 47 return (mc, fn, taskname) 48 49def split_tid_mcfn(tid): 50 if tid.startswith('mc:'): 51 elems = tid.split(':') 52 mc = elems[1] 53 fn = ":".join(elems[2:-1]) 54 taskname = elems[-1] 55 mcfn = "mc:" + mc + ":" + fn 56 else: 57 tid = tid.rsplit(":", 1) 58 mc = "" 59 fn = tid[0] 60 taskname = tid[1] 61 mcfn = fn 62 63 return (mc, fn, taskname, mcfn) 64 65def build_tid(mc, fn, taskname): 66 if mc: 67 return "mc:" + mc + ":" + fn + ":" + taskname 68 return fn + ":" + taskname 69 70# Index used to pair up potentially matching multiconfig tasks 71# We match on PN, taskname and hash being equal 72def pending_hash_index(tid, rqdata): 73 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 74 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 75 h = rqdata.runtaskentries[tid].unihash 76 return pn + ":" + "taskname" + h 77 78class RunQueueStats: 79 """ 80 Holds statistics on the tasks handled by the associated runQueue 81 """ 82 def __init__(self, total): 83 self.completed = 0 84 self.skipped = 0 85 self.failed = 0 86 self.active = 0 87 self.total = total 88 89 def copy(self): 90 obj = self.__class__(self.total) 91 obj.__dict__.update(self.__dict__) 92 return obj 93 94 def taskFailed(self): 95 self.active = self.active - 1 96 self.failed = self.failed + 1 97 98 def taskCompleted(self): 99 self.active = self.active - 1 100 self.completed = self.completed + 1 101 102 def taskSkipped(self): 103 self.active = self.active + 1 104 self.skipped = self.skipped + 1 105 106 def taskActive(self): 107 self.active = self.active + 1 108 109# These values indicate the next step due to be run in the 110# runQueue state machine 111runQueuePrepare = 2 112runQueueSceneInit = 3 113runQueueRunning = 6 114runQueueFailed = 7 115runQueueCleanUp = 8 116runQueueComplete = 9 117 118class RunQueueScheduler(object): 119 """ 120 Control the order tasks are scheduled in. 121 """ 122 name = "basic" 123 124 def __init__(self, runqueue, rqdata): 125 """ 126 The default scheduler just returns the first buildable task (the 127 priority map is sorted by task number) 128 """ 129 self.rq = runqueue 130 self.rqdata = rqdata 131 self.numTasks = len(self.rqdata.runtaskentries) 132 133 self.prio_map = [self.rqdata.runtaskentries.keys()] 134 135 self.buildable = set() 136 self.skip_maxthread = {} 137 self.stamps = {} 138 for tid in self.rqdata.runtaskentries: 139 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 140 self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 141 if tid in self.rq.runq_buildable: 142 self.buildable.append(tid) 143 144 self.rev_prio_map = None 145 146 def next_buildable_task(self): 147 """ 148 Return the id of the first task we find that is buildable 149 """ 150 # Once tasks are running we don't need to worry about them again 151 self.buildable.difference_update(self.rq.runq_running) 152 buildable = set(self.buildable) 153 buildable.difference_update(self.rq.holdoff_tasks) 154 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 155 if not buildable: 156 return None 157 158 # Filter out tasks that have a max number of threads that have been exceeded 159 skip_buildable = {} 160 for running in self.rq.runq_running.difference(self.rq.runq_complete): 161 rtaskname = taskname_from_tid(running) 162 if rtaskname not in self.skip_maxthread: 163 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 164 if not self.skip_maxthread[rtaskname]: 165 continue 166 if rtaskname in skip_buildable: 167 skip_buildable[rtaskname] += 1 168 else: 169 skip_buildable[rtaskname] = 1 170 171 if len(buildable) == 1: 172 tid = buildable.pop() 173 taskname = taskname_from_tid(tid) 174 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 175 return None 176 stamp = self.stamps[tid] 177 if stamp not in self.rq.build_stamps.values(): 178 return tid 179 180 if not self.rev_prio_map: 181 self.rev_prio_map = {} 182 for tid in self.rqdata.runtaskentries: 183 self.rev_prio_map[tid] = self.prio_map.index(tid) 184 185 best = None 186 bestprio = None 187 for tid in buildable: 188 taskname = taskname_from_tid(tid) 189 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 190 continue 191 prio = self.rev_prio_map[tid] 192 if bestprio is None or bestprio > prio: 193 stamp = self.stamps[tid] 194 if stamp in self.rq.build_stamps.values(): 195 continue 196 bestprio = prio 197 best = tid 198 199 return best 200 201 def next(self): 202 """ 203 Return the id of the task we should build next 204 """ 205 if self.rq.can_start_task(): 206 return self.next_buildable_task() 207 208 def newbuildable(self, task): 209 self.buildable.add(task) 210 211 def removebuildable(self, task): 212 self.buildable.remove(task) 213 214 def describe_task(self, taskid): 215 result = 'ID %s' % taskid 216 if self.rev_prio_map: 217 result = result + (' pri %d' % self.rev_prio_map[taskid]) 218 return result 219 220 def dump_prio(self, comment): 221 bb.debug(3, '%s (most important first):\n%s' % 222 (comment, 223 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 224 index, taskid in enumerate(self.prio_map)]))) 225 226class RunQueueSchedulerSpeed(RunQueueScheduler): 227 """ 228 A scheduler optimised for speed. The priority map is sorted by task weight, 229 heavier weighted tasks (tasks needed by the most other tasks) are run first. 230 """ 231 name = "speed" 232 233 def __init__(self, runqueue, rqdata): 234 """ 235 The priority map is sorted by task weight. 236 """ 237 RunQueueScheduler.__init__(self, runqueue, rqdata) 238 239 weights = {} 240 for tid in self.rqdata.runtaskentries: 241 weight = self.rqdata.runtaskentries[tid].weight 242 if not weight in weights: 243 weights[weight] = [] 244 weights[weight].append(tid) 245 246 self.prio_map = [] 247 for weight in sorted(weights): 248 for w in weights[weight]: 249 self.prio_map.append(w) 250 251 self.prio_map.reverse() 252 253class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 254 """ 255 A scheduler optimised to complete .bb files as quickly as possible. The 256 priority map is sorted by task weight, but then reordered so once a given 257 .bb file starts to build, it's completed as quickly as possible by 258 running all tasks related to the same .bb file one after the after. 259 This works well where disk space is at a premium and classes like OE's 260 rm_work are in force. 261 """ 262 name = "completion" 263 264 def __init__(self, runqueue, rqdata): 265 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 266 267 # Extract list of tasks for each recipe, with tasks sorted 268 # ascending from "must run first" (typically do_fetch) to 269 # "runs last" (do_build). The speed scheduler prioritizes 270 # tasks that must run first before the ones that run later; 271 # this is what we depend on here. 272 task_lists = {} 273 for taskid in self.prio_map: 274 fn, taskname = taskid.rsplit(':', 1) 275 task_lists.setdefault(fn, []).append(taskname) 276 277 # Now unify the different task lists. The strategy is that 278 # common tasks get skipped and new ones get inserted after the 279 # preceeding common one(s) as they are found. Because task 280 # lists should differ only by their number of tasks, but not 281 # the ordering of the common tasks, this should result in a 282 # deterministic result that is a superset of the individual 283 # task ordering. 284 all_tasks = [] 285 for recipe, new_tasks in task_lists.items(): 286 index = 0 287 old_task = all_tasks[index] if index < len(all_tasks) else None 288 for new_task in new_tasks: 289 if old_task == new_task: 290 # Common task, skip it. This is the fast-path which 291 # avoids a full search. 292 index += 1 293 old_task = all_tasks[index] if index < len(all_tasks) else None 294 else: 295 try: 296 index = all_tasks.index(new_task) 297 # Already present, just not at the current 298 # place. We re-synchronized by changing the 299 # index so that it matches again. Now 300 # move on to the next existing task. 301 index += 1 302 old_task = all_tasks[index] if index < len(all_tasks) else None 303 except ValueError: 304 # Not present. Insert before old_task, which 305 # remains the same (but gets shifted back). 306 all_tasks.insert(index, new_task) 307 index += 1 308 bb.debug(3, 'merged task list: %s' % all_tasks) 309 310 # Now reverse the order so that tasks that finish the work on one 311 # recipe are considered more imporant (= come first). The ordering 312 # is now so that do_build is most important. 313 all_tasks.reverse() 314 315 # Group tasks of the same kind before tasks of less important 316 # kinds at the head of the queue (because earlier = lower 317 # priority number = runs earlier), while preserving the 318 # ordering by recipe. If recipe foo is more important than 319 # bar, then the goal is to work on foo's do_populate_sysroot 320 # before bar's do_populate_sysroot and on the more important 321 # tasks of foo before any of the less important tasks in any 322 # other recipe (if those other recipes are more important than 323 # foo). 324 # 325 # All of this only applies when tasks are runable. Explicit 326 # dependencies still override this ordering by priority. 327 # 328 # Here's an example why this priority re-ordering helps with 329 # minimizing disk usage. Consider a recipe foo with a higher 330 # priority than bar where foo DEPENDS on bar. Then the 331 # implicit rule (from base.bbclass) is that foo's do_configure 332 # depends on bar's do_populate_sysroot. This ensures that 333 # bar's do_populate_sysroot gets done first. Normally the 334 # tasks from foo would continue to run once that is done, and 335 # bar only gets completed and cleaned up later. By ordering 336 # bar's task that depend on bar's do_populate_sysroot before foo's 337 # do_configure, that problem gets avoided. 338 task_index = 0 339 self.dump_prio('original priorities') 340 for task in all_tasks: 341 for index in range(task_index, self.numTasks): 342 taskid = self.prio_map[index] 343 taskname = taskid.rsplit(':', 1)[1] 344 if taskname == task: 345 del self.prio_map[index] 346 self.prio_map.insert(task_index, taskid) 347 task_index += 1 348 self.dump_prio('completion priorities') 349 350class RunTaskEntry(object): 351 def __init__(self): 352 self.depends = set() 353 self.revdeps = set() 354 self.hash = None 355 self.unihash = None 356 self.task = None 357 self.weight = 1 358 359class RunQueueData: 360 """ 361 BitBake Run Queue implementation 362 """ 363 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 364 self.cooker = cooker 365 self.dataCaches = dataCaches 366 self.taskData = taskData 367 self.targets = targets 368 self.rq = rq 369 self.warn_multi_bb = False 370 371 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST") or "" 372 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST") or "").split() 373 self.setscenewhitelist = get_setscene_enforce_whitelist(cfgData) 374 self.setscenewhitelist_checked = False 375 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 376 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 377 378 self.reset() 379 380 def reset(self): 381 self.runtaskentries = {} 382 383 def runq_depends_names(self, ids): 384 import re 385 ret = [] 386 for id in ids: 387 nam = os.path.basename(id) 388 nam = re.sub("_[^,]*,", ",", nam) 389 ret.extend([nam]) 390 return ret 391 392 def get_task_hash(self, tid): 393 return self.runtaskentries[tid].hash 394 395 def get_task_unihash(self, tid): 396 return self.runtaskentries[tid].unihash 397 398 def get_user_idstring(self, tid, task_name_suffix = ""): 399 return tid + task_name_suffix 400 401 def get_short_user_idstring(self, task, task_name_suffix = ""): 402 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 403 pn = self.dataCaches[mc].pkg_fn[taskfn] 404 taskname = taskname_from_tid(task) + task_name_suffix 405 return "%s:%s" % (pn, taskname) 406 407 def circular_depchains_handler(self, tasks): 408 """ 409 Some tasks aren't buildable, likely due to circular dependency issues. 410 Identify the circular dependencies and print them in a user readable format. 411 """ 412 from copy import deepcopy 413 414 valid_chains = [] 415 explored_deps = {} 416 msgs = [] 417 418 class TooManyLoops(Exception): 419 pass 420 421 def chain_reorder(chain): 422 """ 423 Reorder a dependency chain so the lowest task id is first 424 """ 425 lowest = 0 426 new_chain = [] 427 for entry in range(len(chain)): 428 if chain[entry] < chain[lowest]: 429 lowest = entry 430 new_chain.extend(chain[lowest:]) 431 new_chain.extend(chain[:lowest]) 432 return new_chain 433 434 def chain_compare_equal(chain1, chain2): 435 """ 436 Compare two dependency chains and see if they're the same 437 """ 438 if len(chain1) != len(chain2): 439 return False 440 for index in range(len(chain1)): 441 if chain1[index] != chain2[index]: 442 return False 443 return True 444 445 def chain_array_contains(chain, chain_array): 446 """ 447 Return True if chain_array contains chain 448 """ 449 for ch in chain_array: 450 if chain_compare_equal(ch, chain): 451 return True 452 return False 453 454 def find_chains(tid, prev_chain): 455 prev_chain.append(tid) 456 total_deps = [] 457 total_deps.extend(self.runtaskentries[tid].revdeps) 458 for revdep in self.runtaskentries[tid].revdeps: 459 if revdep in prev_chain: 460 idx = prev_chain.index(revdep) 461 # To prevent duplicates, reorder the chain to start with the lowest taskid 462 # and search through an array of those we've already printed 463 chain = prev_chain[idx:] 464 new_chain = chain_reorder(chain) 465 if not chain_array_contains(new_chain, valid_chains): 466 valid_chains.append(new_chain) 467 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 468 for dep in new_chain: 469 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 470 msgs.append("\n") 471 if len(valid_chains) > 10: 472 msgs.append("Aborted dependency loops search after 10 matches.\n") 473 raise TooManyLoops 474 continue 475 scan = False 476 if revdep not in explored_deps: 477 scan = True 478 elif revdep in explored_deps[revdep]: 479 scan = True 480 else: 481 for dep in prev_chain: 482 if dep in explored_deps[revdep]: 483 scan = True 484 if scan: 485 find_chains(revdep, copy.deepcopy(prev_chain)) 486 for dep in explored_deps[revdep]: 487 if dep not in total_deps: 488 total_deps.append(dep) 489 490 explored_deps[tid] = total_deps 491 492 try: 493 for task in tasks: 494 find_chains(task, []) 495 except TooManyLoops: 496 pass 497 498 return msgs 499 500 def calculate_task_weights(self, endpoints): 501 """ 502 Calculate a number representing the "weight" of each task. Heavier weighted tasks 503 have more dependencies and hence should be executed sooner for maximum speed. 504 505 This function also sanity checks the task list finding tasks that are not 506 possible to execute due to circular dependencies. 507 """ 508 509 numTasks = len(self.runtaskentries) 510 weight = {} 511 deps_left = {} 512 task_done = {} 513 514 for tid in self.runtaskentries: 515 task_done[tid] = False 516 weight[tid] = 1 517 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 518 519 for tid in endpoints: 520 weight[tid] = 10 521 task_done[tid] = True 522 523 while True: 524 next_points = [] 525 for tid in endpoints: 526 for revdep in self.runtaskentries[tid].depends: 527 weight[revdep] = weight[revdep] + weight[tid] 528 deps_left[revdep] = deps_left[revdep] - 1 529 if deps_left[revdep] == 0: 530 next_points.append(revdep) 531 task_done[revdep] = True 532 endpoints = next_points 533 if len(next_points) == 0: 534 break 535 536 # Circular dependency sanity check 537 problem_tasks = [] 538 for tid in self.runtaskentries: 539 if task_done[tid] is False or deps_left[tid] != 0: 540 problem_tasks.append(tid) 541 logger.debug(2, "Task %s is not buildable", tid) 542 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 543 self.runtaskentries[tid].weight = weight[tid] 544 545 if problem_tasks: 546 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 547 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 548 message = message + "Identifying dependency loops (this may take a short while)...\n" 549 logger.error(message) 550 551 msgs = self.circular_depchains_handler(problem_tasks) 552 553 message = "\n" 554 for msg in msgs: 555 message = message + msg 556 bb.msg.fatal("RunQueue", message) 557 558 return weight 559 560 def prepare(self): 561 """ 562 Turn a set of taskData into a RunQueue and compute data needed 563 to optimise the execution order. 564 """ 565 566 runq_build = {} 567 recursivetasks = {} 568 recursiveitasks = {} 569 recursivetasksselfref = set() 570 571 taskData = self.taskData 572 573 found = False 574 for mc in self.taskData: 575 if len(taskData[mc].taskentries) > 0: 576 found = True 577 break 578 if not found: 579 # Nothing to do 580 return 0 581 582 self.init_progress_reporter.start() 583 self.init_progress_reporter.next_stage() 584 585 # Step A - Work out a list of tasks to run 586 # 587 # Taskdata gives us a list of possible providers for every build and run 588 # target ordered by priority. It also gives information on each of those 589 # providers. 590 # 591 # To create the actual list of tasks to execute we fix the list of 592 # providers and then resolve the dependencies into task IDs. This 593 # process is repeated for each type of dependency (tdepends, deptask, 594 # rdeptast, recrdeptask, idepends). 595 596 def add_build_dependencies(depids, tasknames, depends, mc): 597 for depname in depids: 598 # Won't be in build_targets if ASSUME_PROVIDED 599 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 600 continue 601 depdata = taskData[mc].build_targets[depname][0] 602 if depdata is None: 603 continue 604 for taskname in tasknames: 605 t = depdata + ":" + taskname 606 if t in taskData[mc].taskentries: 607 depends.add(t) 608 609 def add_runtime_dependencies(depids, tasknames, depends, mc): 610 for depname in depids: 611 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 612 continue 613 depdata = taskData[mc].run_targets[depname][0] 614 if depdata is None: 615 continue 616 for taskname in tasknames: 617 t = depdata + ":" + taskname 618 if t in taskData[mc].taskentries: 619 depends.add(t) 620 621 def add_mc_dependencies(mc, tid): 622 mcdeps = taskData[mc].get_mcdepends() 623 for dep in mcdeps: 624 mcdependency = dep.split(':') 625 pn = mcdependency[3] 626 frommc = mcdependency[1] 627 mcdep = mcdependency[2] 628 deptask = mcdependency[4] 629 if mc == frommc: 630 fn = taskData[mcdep].build_targets[pn][0] 631 newdep = '%s:%s' % (fn,deptask) 632 taskData[mc].taskentries[tid].tdepends.append(newdep) 633 634 for mc in taskData: 635 for tid in taskData[mc].taskentries: 636 637 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 638 #runtid = build_tid(mc, fn, taskname) 639 640 #logger.debug(2, "Processing %s,%s:%s", mc, fn, taskname) 641 642 depends = set() 643 task_deps = self.dataCaches[mc].task_deps[taskfn] 644 645 self.runtaskentries[tid] = RunTaskEntry() 646 647 if fn in taskData[mc].failed_fns: 648 continue 649 650 # We add multiconfig dependencies before processing internal task deps (tdepends) 651 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 652 add_mc_dependencies(mc, tid) 653 654 # Resolve task internal dependencies 655 # 656 # e.g. addtask before X after Y 657 for t in taskData[mc].taskentries[tid].tdepends: 658 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 659 depends.add(build_tid(depmc, depfn, deptaskname)) 660 661 # Resolve 'deptask' dependencies 662 # 663 # e.g. do_sometask[deptask] = "do_someothertask" 664 # (makes sure sometask runs after someothertask of all DEPENDS) 665 if 'deptask' in task_deps and taskname in task_deps['deptask']: 666 tasknames = task_deps['deptask'][taskname].split() 667 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 668 669 # Resolve 'rdeptask' dependencies 670 # 671 # e.g. do_sometask[rdeptask] = "do_someothertask" 672 # (makes sure sometask runs after someothertask of all RDEPENDS) 673 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 674 tasknames = task_deps['rdeptask'][taskname].split() 675 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 676 677 # Resolve inter-task dependencies 678 # 679 # e.g. do_sometask[depends] = "targetname:do_someothertask" 680 # (makes sure sometask runs after targetname's someothertask) 681 idepends = taskData[mc].taskentries[tid].idepends 682 for (depname, idependtask) in idepends: 683 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 684 # Won't be in build_targets if ASSUME_PROVIDED 685 depdata = taskData[mc].build_targets[depname][0] 686 if depdata is not None: 687 t = depdata + ":" + idependtask 688 depends.add(t) 689 if t not in taskData[mc].taskentries: 690 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 691 irdepends = taskData[mc].taskentries[tid].irdepends 692 for (depname, idependtask) in irdepends: 693 if depname in taskData[mc].run_targets: 694 # Won't be in run_targets if ASSUME_PROVIDED 695 if not taskData[mc].run_targets[depname]: 696 continue 697 depdata = taskData[mc].run_targets[depname][0] 698 if depdata is not None: 699 t = depdata + ":" + idependtask 700 depends.add(t) 701 if t not in taskData[mc].taskentries: 702 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 703 704 # Resolve recursive 'recrdeptask' dependencies (Part A) 705 # 706 # e.g. do_sometask[recrdeptask] = "do_someothertask" 707 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 708 # We cover the recursive part of the dependencies below 709 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 710 tasknames = task_deps['recrdeptask'][taskname].split() 711 recursivetasks[tid] = tasknames 712 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 713 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 714 if taskname in tasknames: 715 recursivetasksselfref.add(tid) 716 717 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 718 recursiveitasks[tid] = [] 719 for t in task_deps['recideptask'][taskname].split(): 720 newdep = build_tid(mc, fn, t) 721 recursiveitasks[tid].append(newdep) 722 723 self.runtaskentries[tid].depends = depends 724 # Remove all self references 725 self.runtaskentries[tid].depends.discard(tid) 726 727 #self.dump_data() 728 729 self.init_progress_reporter.next_stage() 730 731 # Resolve recursive 'recrdeptask' dependencies (Part B) 732 # 733 # e.g. do_sometask[recrdeptask] = "do_someothertask" 734 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 735 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 736 737 # Generating/interating recursive lists of dependencies is painful and potentially slow 738 # Precompute recursive task dependencies here by: 739 # a) create a temp list of reverse dependencies (revdeps) 740 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 741 # c) combine the total list of dependencies in cumulativedeps 742 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 743 744 745 revdeps = {} 746 deps = {} 747 cumulativedeps = {} 748 for tid in self.runtaskentries: 749 deps[tid] = set(self.runtaskentries[tid].depends) 750 revdeps[tid] = set() 751 cumulativedeps[tid] = set() 752 # Generate a temp list of reverse dependencies 753 for tid in self.runtaskentries: 754 for dep in self.runtaskentries[tid].depends: 755 revdeps[dep].add(tid) 756 # Find the dependency chain endpoints 757 endpoints = set() 758 for tid in self.runtaskentries: 759 if len(deps[tid]) == 0: 760 endpoints.add(tid) 761 # Iterate the chains collating dependencies 762 while endpoints: 763 next = set() 764 for tid in endpoints: 765 for dep in revdeps[tid]: 766 cumulativedeps[dep].add(fn_from_tid(tid)) 767 cumulativedeps[dep].update(cumulativedeps[tid]) 768 if tid in deps[dep]: 769 deps[dep].remove(tid) 770 if len(deps[dep]) == 0: 771 next.add(dep) 772 endpoints = next 773 #for tid in deps: 774 # if len(deps[tid]) != 0: 775 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 776 777 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 778 # resolve these recursively until we aren't adding any further extra dependencies 779 extradeps = True 780 while extradeps: 781 extradeps = 0 782 for tid in recursivetasks: 783 tasknames = recursivetasks[tid] 784 785 totaldeps = set(self.runtaskentries[tid].depends) 786 if tid in recursiveitasks: 787 totaldeps.update(recursiveitasks[tid]) 788 for dep in recursiveitasks[tid]: 789 if dep not in self.runtaskentries: 790 continue 791 totaldeps.update(self.runtaskentries[dep].depends) 792 793 deps = set() 794 for dep in totaldeps: 795 if dep in cumulativedeps: 796 deps.update(cumulativedeps[dep]) 797 798 for t in deps: 799 for taskname in tasknames: 800 newtid = t + ":" + taskname 801 if newtid == tid: 802 continue 803 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 804 extradeps += 1 805 self.runtaskentries[tid].depends.add(newtid) 806 807 # Handle recursive tasks which depend upon other recursive tasks 808 deps = set() 809 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 810 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 811 for newtid in deps: 812 for taskname in tasknames: 813 if not newtid.endswith(":" + taskname): 814 continue 815 if newtid in self.runtaskentries: 816 extradeps += 1 817 self.runtaskentries[tid].depends.add(newtid) 818 819 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 820 821 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 822 for tid in recursivetasksselfref: 823 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 824 825 self.init_progress_reporter.next_stage() 826 827 #self.dump_data() 828 829 # Step B - Mark all active tasks 830 # 831 # Start with the tasks we were asked to run and mark all dependencies 832 # as active too. If the task is to be 'forced', clear its stamp. Once 833 # all active tasks are marked, prune the ones we don't need. 834 835 logger.verbose("Marking Active Tasks") 836 837 def mark_active(tid, depth): 838 """ 839 Mark an item as active along with its depends 840 (calls itself recursively) 841 """ 842 843 if tid in runq_build: 844 return 845 846 runq_build[tid] = 1 847 848 depends = self.runtaskentries[tid].depends 849 for depend in depends: 850 mark_active(depend, depth+1) 851 852 def invalidate_task(tid, error_nostamp): 853 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 854 taskdep = self.dataCaches[mc].task_deps[taskfn] 855 if fn + ":" + taskname not in taskData[mc].taskentries: 856 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 857 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 858 if error_nostamp: 859 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 860 else: 861 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 862 else: 863 logger.verbose("Invalidate task %s, %s", taskname, fn) 864 bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn) 865 866 self.target_tids = [] 867 for (mc, target, task, fn) in self.targets: 868 869 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 870 continue 871 872 if target in taskData[mc].failed_deps: 873 continue 874 875 parents = False 876 if task.endswith('-'): 877 parents = True 878 task = task[:-1] 879 880 if fn in taskData[mc].failed_fns: 881 continue 882 883 # fn already has mc prefix 884 tid = fn + ":" + task 885 self.target_tids.append(tid) 886 if tid not in taskData[mc].taskentries: 887 import difflib 888 tasks = [] 889 for x in taskData[mc].taskentries: 890 if x.startswith(fn + ":"): 891 tasks.append(taskname_from_tid(x)) 892 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 893 if close_matches: 894 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 895 else: 896 extra = "" 897 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 898 899 # For tasks called "XXXX-", ony run their dependencies 900 if parents: 901 for i in self.runtaskentries[tid].depends: 902 mark_active(i, 1) 903 else: 904 mark_active(tid, 1) 905 906 self.init_progress_reporter.next_stage() 907 908 # Step C - Prune all inactive tasks 909 # 910 # Once all active tasks are marked, prune the ones we don't need. 911 912 delcount = {} 913 for tid in list(self.runtaskentries.keys()): 914 if tid not in runq_build: 915 delcount[tid] = self.runtaskentries[tid] 916 del self.runtaskentries[tid] 917 918 # Handle --runall 919 if self.cooker.configuration.runall: 920 # re-run the mark_active and then drop unused tasks from new list 921 runq_build = {} 922 923 for task in self.cooker.configuration.runall: 924 if not task.startswith("do_"): 925 task = "do_{0}".format(task) 926 runall_tids = set() 927 for tid in list(self.runtaskentries): 928 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 929 if wanttid in delcount: 930 self.runtaskentries[wanttid] = delcount[wanttid] 931 if wanttid in self.runtaskentries: 932 runall_tids.add(wanttid) 933 934 for tid in list(runall_tids): 935 mark_active(tid,1) 936 if self.cooker.configuration.force: 937 invalidate_task(tid, False) 938 939 for tid in list(self.runtaskentries.keys()): 940 if tid not in runq_build: 941 delcount[tid] = self.runtaskentries[tid] 942 del self.runtaskentries[tid] 943 944 if len(self.runtaskentries) == 0: 945 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 946 947 self.init_progress_reporter.next_stage() 948 949 # Handle runonly 950 if self.cooker.configuration.runonly: 951 # re-run the mark_active and then drop unused tasks from new list 952 runq_build = {} 953 954 for task in self.cooker.configuration.runonly: 955 if not task.startswith("do_"): 956 task = "do_{0}".format(task) 957 runonly_tids = { k: v for k, v in self.runtaskentries.items() if taskname_from_tid(k) == task } 958 959 for tid in list(runonly_tids): 960 mark_active(tid,1) 961 if self.cooker.configuration.force: 962 invalidate_task(tid, False) 963 964 for tid in list(self.runtaskentries.keys()): 965 if tid not in runq_build: 966 delcount[tid] = self.runtaskentries[tid] 967 del self.runtaskentries[tid] 968 969 if len(self.runtaskentries) == 0: 970 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 971 972 # 973 # Step D - Sanity checks and computation 974 # 975 976 # Check to make sure we still have tasks to run 977 if len(self.runtaskentries) == 0: 978 if not taskData[''].abort: 979 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 980 else: 981 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 982 983 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 984 985 logger.verbose("Assign Weightings") 986 987 self.init_progress_reporter.next_stage() 988 989 # Generate a list of reverse dependencies to ease future calculations 990 for tid in self.runtaskentries: 991 for dep in self.runtaskentries[tid].depends: 992 self.runtaskentries[dep].revdeps.add(tid) 993 994 self.init_progress_reporter.next_stage() 995 996 # Identify tasks at the end of dependency chains 997 # Error on circular dependency loops (length two) 998 endpoints = [] 999 for tid in self.runtaskentries: 1000 revdeps = self.runtaskentries[tid].revdeps 1001 if len(revdeps) == 0: 1002 endpoints.append(tid) 1003 for dep in revdeps: 1004 if dep in self.runtaskentries[tid].depends: 1005 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1006 1007 1008 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1009 1010 self.init_progress_reporter.next_stage() 1011 1012 # Calculate task weights 1013 # Check of higher length circular dependencies 1014 self.runq_weight = self.calculate_task_weights(endpoints) 1015 1016 self.init_progress_reporter.next_stage() 1017 1018 # Sanity Check - Check for multiple tasks building the same provider 1019 for mc in self.dataCaches: 1020 prov_list = {} 1021 seen_fn = [] 1022 for tid in self.runtaskentries: 1023 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1024 if taskfn in seen_fn: 1025 continue 1026 if mc != tidmc: 1027 continue 1028 seen_fn.append(taskfn) 1029 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1030 if prov not in prov_list: 1031 prov_list[prov] = [taskfn] 1032 elif taskfn not in prov_list[prov]: 1033 prov_list[prov].append(taskfn) 1034 for prov in prov_list: 1035 if len(prov_list[prov]) < 2: 1036 continue 1037 if prov in self.multi_provider_whitelist: 1038 continue 1039 seen_pn = [] 1040 # If two versions of the same PN are being built its fatal, we don't support it. 1041 for fn in prov_list[prov]: 1042 pn = self.dataCaches[mc].pkg_fn[fn] 1043 if pn not in seen_pn: 1044 seen_pn.append(pn) 1045 else: 1046 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1047 msg = "Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov])) 1048 # 1049 # Construct a list of things which uniquely depend on each provider 1050 # since this may help the user figure out which dependency is triggering this warning 1051 # 1052 msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from." 1053 deplist = {} 1054 commondeps = None 1055 for provfn in prov_list[prov]: 1056 deps = set() 1057 for tid in self.runtaskentries: 1058 fn = fn_from_tid(tid) 1059 if fn != provfn: 1060 continue 1061 for dep in self.runtaskentries[tid].revdeps: 1062 fn = fn_from_tid(dep) 1063 if fn == provfn: 1064 continue 1065 deps.add(dep) 1066 if not commondeps: 1067 commondeps = set(deps) 1068 else: 1069 commondeps &= deps 1070 deplist[provfn] = deps 1071 for provfn in deplist: 1072 msg += "\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps)) 1073 # 1074 # Construct a list of provides and runtime providers for each recipe 1075 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1076 # 1077 msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful." 1078 provide_results = {} 1079 rprovide_results = {} 1080 commonprovs = None 1081 commonrprovs = None 1082 for provfn in prov_list[prov]: 1083 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1084 rprovides = set() 1085 for rprovide in self.dataCaches[mc].rproviders: 1086 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1087 rprovides.add(rprovide) 1088 for package in self.dataCaches[mc].packages: 1089 if provfn in self.dataCaches[mc].packages[package]: 1090 rprovides.add(package) 1091 for package in self.dataCaches[mc].packages_dynamic: 1092 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1093 rprovides.add(package) 1094 if not commonprovs: 1095 commonprovs = set(provides) 1096 else: 1097 commonprovs &= provides 1098 provide_results[provfn] = provides 1099 if not commonrprovs: 1100 commonrprovs = set(rprovides) 1101 else: 1102 commonrprovs &= rprovides 1103 rprovide_results[provfn] = rprovides 1104 #msg += "\nCommon provides:\n %s" % ("\n ".join(commonprovs)) 1105 #msg += "\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs)) 1106 for provfn in prov_list[prov]: 1107 msg += "\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs)) 1108 msg += "\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs)) 1109 1110 if self.warn_multi_bb: 1111 logger.verbnote(msg) 1112 else: 1113 logger.error(msg) 1114 1115 self.init_progress_reporter.next_stage() 1116 1117 # Create a whitelist usable by the stamp checks 1118 self.stampfnwhitelist = {} 1119 for mc in self.taskData: 1120 self.stampfnwhitelist[mc] = [] 1121 for entry in self.stampwhitelist.split(): 1122 if entry not in self.taskData[mc].build_targets: 1123 continue 1124 fn = self.taskData.build_targets[entry][0] 1125 self.stampfnwhitelist[mc].append(fn) 1126 1127 self.init_progress_reporter.next_stage() 1128 1129 # Iterate over the task list looking for tasks with a 'setscene' function 1130 self.runq_setscene_tids = set() 1131 if not self.cooker.configuration.nosetscene: 1132 for tid in self.runtaskentries: 1133 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1134 setscenetid = tid + "_setscene" 1135 if setscenetid not in taskData[mc].taskentries: 1136 continue 1137 self.runq_setscene_tids.add(tid) 1138 1139 self.init_progress_reporter.next_stage() 1140 1141 # Invalidate task if force mode active 1142 if self.cooker.configuration.force: 1143 for tid in self.target_tids: 1144 invalidate_task(tid, False) 1145 1146 # Invalidate task if invalidate mode active 1147 if self.cooker.configuration.invalidate_stamp: 1148 for tid in self.target_tids: 1149 fn = fn_from_tid(tid) 1150 for st in self.cooker.configuration.invalidate_stamp.split(','): 1151 if not st.startswith("do_"): 1152 st = "do_%s" % st 1153 invalidate_task(fn + ":" + st, True) 1154 1155 self.init_progress_reporter.next_stage() 1156 1157 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1158 for mc in taskData: 1159 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1160 virtpnmap = {} 1161 for v in virtmap: 1162 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1163 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1164 if hasattr(bb.parse.siggen, "tasks_resolved"): 1165 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1166 1167 self.init_progress_reporter.next_stage() 1168 1169 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1170 1171 # Iterate over the task list and call into the siggen code 1172 dealtwith = set() 1173 todeal = set(self.runtaskentries) 1174 while len(todeal) > 0: 1175 for tid in todeal.copy(): 1176 if len(self.runtaskentries[tid].depends - dealtwith) == 0: 1177 dealtwith.add(tid) 1178 todeal.remove(tid) 1179 self.prepare_task_hash(tid) 1180 1181 bb.parse.siggen.writeout_file_checksum_cache() 1182 1183 #self.dump_data() 1184 return len(self.runtaskentries) 1185 1186 def prepare_task_hash(self, tid): 1187 bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches[mc_from_tid(tid)]) 1188 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches[mc_from_tid(tid)]) 1189 self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 1190 1191 def dump_data(self): 1192 """ 1193 Dump some debug information on the internal data structures 1194 """ 1195 logger.debug(3, "run_tasks:") 1196 for tid in self.runtaskentries: 1197 logger.debug(3, " %s: %s Deps %s RevDeps %s", tid, 1198 self.runtaskentries[tid].weight, 1199 self.runtaskentries[tid].depends, 1200 self.runtaskentries[tid].revdeps) 1201 1202class RunQueueWorker(): 1203 def __init__(self, process, pipe): 1204 self.process = process 1205 self.pipe = pipe 1206 1207class RunQueue: 1208 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1209 1210 self.cooker = cooker 1211 self.cfgData = cfgData 1212 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1213 1214 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY") or "perfile" 1215 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1216 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1217 1218 self.state = runQueuePrepare 1219 1220 # For disk space monitor 1221 # Invoked at regular time intervals via the bitbake heartbeat event 1222 # while the build is running. We generate a unique name for the handler 1223 # here, just in case that there ever is more than one RunQueue instance, 1224 # start the handler when reaching runQueueSceneInit, and stop it when 1225 # done with the build. 1226 self.dm = monitordisk.diskMonitor(cfgData) 1227 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1228 self.dm_event_handler_registered = False 1229 self.rqexe = None 1230 self.worker = {} 1231 self.fakeworker = {} 1232 1233 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1234 logger.debug(1, "Starting bitbake-worker") 1235 magic = "decafbad" 1236 if self.cooker.configuration.profile: 1237 magic = "decafbadbad" 1238 if fakeroot: 1239 magic = magic + "beef" 1240 mcdata = self.cooker.databuilder.mcdata[mc] 1241 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1242 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1243 env = os.environ.copy() 1244 for key, value in (var.split('=') for var in fakerootenv): 1245 env[key] = value 1246 worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1247 else: 1248 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1249 bb.utils.nonblockingfd(worker.stdout) 1250 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec) 1251 1252 workerdata = { 1253 "taskdeps" : self.rqdata.dataCaches[mc].task_deps, 1254 "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv, 1255 "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs, 1256 "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, 1257 "sigdata" : bb.parse.siggen.get_taskdata(), 1258 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1259 "logdefaultverbose" : bb.msg.loggerDefaultVerbose, 1260 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, 1261 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1262 "prhost" : self.cooker.prhost, 1263 "buildname" : self.cfgData.getVar("BUILDNAME"), 1264 "date" : self.cfgData.getVar("DATE"), 1265 "time" : self.cfgData.getVar("TIME"), 1266 "hashservaddr" : self.cooker.hashservaddr, 1267 } 1268 1269 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") 1270 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") 1271 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 1272 worker.stdin.flush() 1273 1274 return RunQueueWorker(worker, workerpipe) 1275 1276 def _teardown_worker(self, worker): 1277 if not worker: 1278 return 1279 logger.debug(1, "Teardown for bitbake-worker") 1280 try: 1281 worker.process.stdin.write(b"<quit></quit>") 1282 worker.process.stdin.flush() 1283 worker.process.stdin.close() 1284 except IOError: 1285 pass 1286 while worker.process.returncode is None: 1287 worker.pipe.read() 1288 worker.process.poll() 1289 while worker.pipe.read(): 1290 continue 1291 worker.pipe.close() 1292 1293 def start_worker(self): 1294 if self.worker: 1295 self.teardown_workers() 1296 self.teardown = False 1297 for mc in self.rqdata.dataCaches: 1298 self.worker[mc] = self._start_worker(mc) 1299 1300 def start_fakeworker(self, rqexec, mc): 1301 if not mc in self.fakeworker: 1302 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1303 1304 def teardown_workers(self): 1305 self.teardown = True 1306 for mc in self.worker: 1307 self._teardown_worker(self.worker[mc]) 1308 self.worker = {} 1309 for mc in self.fakeworker: 1310 self._teardown_worker(self.fakeworker[mc]) 1311 self.fakeworker = {} 1312 1313 def read_workers(self): 1314 for mc in self.worker: 1315 self.worker[mc].pipe.read() 1316 for mc in self.fakeworker: 1317 self.fakeworker[mc].pipe.read() 1318 1319 def active_fds(self): 1320 fds = [] 1321 for mc in self.worker: 1322 fds.append(self.worker[mc].pipe.input) 1323 for mc in self.fakeworker: 1324 fds.append(self.fakeworker[mc].pipe.input) 1325 return fds 1326 1327 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1328 def get_timestamp(f): 1329 try: 1330 if not os.access(f, os.F_OK): 1331 return None 1332 return os.stat(f)[stat.ST_MTIME] 1333 except: 1334 return None 1335 1336 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1337 if taskname is None: 1338 taskname = tn 1339 1340 if self.stamppolicy == "perfile": 1341 fulldeptree = False 1342 else: 1343 fulldeptree = True 1344 stampwhitelist = [] 1345 if self.stamppolicy == "whitelist": 1346 stampwhitelist = self.rqdata.stampfnwhitelist[mc] 1347 1348 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn) 1349 1350 # If the stamp is missing, it's not current 1351 if not os.access(stampfile, os.F_OK): 1352 logger.debug(2, "Stampfile %s not available", stampfile) 1353 return False 1354 # If it's a 'nostamp' task, it's not current 1355 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1356 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1357 logger.debug(2, "%s.%s is nostamp\n", fn, taskname) 1358 return False 1359 1360 if taskname != "do_setscene" and taskname.endswith("_setscene"): 1361 return True 1362 1363 if cache is None: 1364 cache = {} 1365 1366 iscurrent = True 1367 t1 = get_timestamp(stampfile) 1368 for dep in self.rqdata.runtaskentries[tid].depends: 1369 if iscurrent: 1370 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1371 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2) 1372 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2) 1373 t2 = get_timestamp(stampfile2) 1374 t3 = get_timestamp(stampfile3) 1375 if t3 and not t2: 1376 continue 1377 if t3 and t3 > t2: 1378 continue 1379 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): 1380 if not t2: 1381 logger.debug(2, 'Stampfile %s does not exist', stampfile2) 1382 iscurrent = False 1383 break 1384 if t1 < t2: 1385 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2) 1386 iscurrent = False 1387 break 1388 if recurse and iscurrent: 1389 if dep in cache: 1390 iscurrent = cache[dep] 1391 if not iscurrent: 1392 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1393 else: 1394 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1395 cache[dep] = iscurrent 1396 if recurse: 1397 cache[tid] = iscurrent 1398 return iscurrent 1399 1400 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1401 valid = set() 1402 if self.hashvalidate: 1403 sq_data = {} 1404 sq_data['hash'] = {} 1405 sq_data['hashfn'] = {} 1406 sq_data['unihash'] = {} 1407 for tid in tocheck: 1408 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1409 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1410 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1411 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1412 1413 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1414 1415 return valid 1416 1417 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1418 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1419 1420 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1421 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1422 1423 return bb.utils.better_eval(call, locs) 1424 1425 def _execute_runqueue(self): 1426 """ 1427 Run the tasks in a queue prepared by rqdata.prepare() 1428 Upon failure, optionally try to recover the build using any alternate providers 1429 (if the abort on failure configuration option isn't set) 1430 """ 1431 1432 retval = True 1433 1434 if self.state is runQueuePrepare: 1435 # NOTE: if you add, remove or significantly refactor the stages of this 1436 # process then you should recalculate the weightings here. This is quite 1437 # easy to do - just change the next line temporarily to pass debug=True as 1438 # the last parameter and you'll get a printout of the weightings as well 1439 # as a map to the lines where next_stage() was called. Of course this isn't 1440 # critical, but it helps to keep the progress reporting accurate. 1441 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1442 "Initialising tasks", 1443 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1444 if self.rqdata.prepare() == 0: 1445 self.state = runQueueComplete 1446 else: 1447 self.state = runQueueSceneInit 1448 bb.parse.siggen.save_unitaskhashes() 1449 1450 if self.state is runQueueSceneInit: 1451 self.rqdata.init_progress_reporter.next_stage() 1452 1453 # we are ready to run, emit dependency info to any UI or class which 1454 # needs it 1455 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1456 self.rqdata.init_progress_reporter.next_stage() 1457 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1458 1459 if not self.dm_event_handler_registered: 1460 res = bb.event.register(self.dm_event_handler_name, 1461 lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1462 ('bb.event.HeartbeatEvent',)) 1463 self.dm_event_handler_registered = True 1464 1465 dump = self.cooker.configuration.dump_signatures 1466 if dump: 1467 self.rqdata.init_progress_reporter.finish() 1468 if 'printdiff' in dump: 1469 invalidtasks = self.print_diffscenetasks() 1470 self.dump_signatures(dump) 1471 if 'printdiff' in dump: 1472 self.write_diffscenetasks(invalidtasks) 1473 self.state = runQueueComplete 1474 1475 if self.state is runQueueSceneInit: 1476 self.rqdata.init_progress_reporter.next_stage() 1477 self.start_worker() 1478 self.rqdata.init_progress_reporter.next_stage() 1479 self.rqexe = RunQueueExecute(self) 1480 1481 # If we don't have any setscene functions, skip execution 1482 if len(self.rqdata.runq_setscene_tids) == 0: 1483 logger.info('No setscene tasks') 1484 for tid in self.rqdata.runtaskentries: 1485 if len(self.rqdata.runtaskentries[tid].depends) == 0: 1486 self.rqexe.setbuildable(tid) 1487 self.rqexe.tasks_notcovered.add(tid) 1488 self.rqexe.sqdone = True 1489 logger.info('Executing Tasks') 1490 self.state = runQueueRunning 1491 1492 if self.state is runQueueRunning: 1493 retval = self.rqexe.execute() 1494 1495 if self.state is runQueueCleanUp: 1496 retval = self.rqexe.finish() 1497 1498 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1499 1500 if build_done and self.dm_event_handler_registered: 1501 bb.event.remove(self.dm_event_handler_name, None) 1502 self.dm_event_handler_registered = False 1503 1504 if build_done and self.rqexe: 1505 bb.parse.siggen.save_unitaskhashes() 1506 self.teardown_workers() 1507 if self.rqexe: 1508 if self.rqexe.stats.failed: 1509 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1510 else: 1511 # Let's avoid the word "failed" if nothing actually did 1512 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1513 1514 if self.state is runQueueFailed: 1515 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1516 1517 if self.state is runQueueComplete: 1518 # All done 1519 return False 1520 1521 # Loop 1522 return retval 1523 1524 def execute_runqueue(self): 1525 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1526 try: 1527 return self._execute_runqueue() 1528 except bb.runqueue.TaskFailure: 1529 raise 1530 except SystemExit: 1531 raise 1532 except bb.BBHandledException: 1533 try: 1534 self.teardown_workers() 1535 except: 1536 pass 1537 self.state = runQueueComplete 1538 raise 1539 except Exception as err: 1540 logger.exception("An uncaught exception occurred in runqueue") 1541 try: 1542 self.teardown_workers() 1543 except: 1544 pass 1545 self.state = runQueueComplete 1546 raise 1547 1548 def finish_runqueue(self, now = False): 1549 if not self.rqexe: 1550 self.state = runQueueComplete 1551 return 1552 1553 if now: 1554 self.rqexe.finish_now() 1555 else: 1556 self.rqexe.finish() 1557 1558 def rq_dump_sigfn(self, fn, options): 1559 bb_cache = bb.cache.NoCache(self.cooker.databuilder) 1560 the_data = bb_cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn)) 1561 siggen = bb.parse.siggen 1562 dataCaches = self.rqdata.dataCaches 1563 siggen.dump_sigfn(fn, dataCaches, options) 1564 1565 def dump_signatures(self, options): 1566 fns = set() 1567 bb.note("Reparsing files to collect dependency data") 1568 1569 for tid in self.rqdata.runtaskentries: 1570 fn = fn_from_tid(tid) 1571 fns.add(fn) 1572 1573 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1574 # We cannot use the real multiprocessing.Pool easily due to some local data 1575 # that can't be pickled. This is a cheap multi-process solution. 1576 launched = [] 1577 while fns: 1578 if len(launched) < max_process: 1579 p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options)) 1580 p.start() 1581 launched.append(p) 1582 for q in launched: 1583 # The finished processes are joined when calling is_alive() 1584 if not q.is_alive(): 1585 launched.remove(q) 1586 for p in launched: 1587 p.join() 1588 1589 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1590 1591 return 1592 1593 def print_diffscenetasks(self): 1594 1595 noexec = [] 1596 tocheck = set() 1597 1598 for tid in self.rqdata.runtaskentries: 1599 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1600 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1601 1602 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1603 noexec.append(tid) 1604 continue 1605 1606 tocheck.add(tid) 1607 1608 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1609 1610 # Tasks which are both setscene and noexec never care about dependencies 1611 # We therefore find tasks which are setscene and noexec and mark their 1612 # unique dependencies as valid. 1613 for tid in noexec: 1614 if tid not in self.rqdata.runq_setscene_tids: 1615 continue 1616 for dep in self.rqdata.runtaskentries[tid].depends: 1617 hasnoexecparents = True 1618 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1619 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1620 continue 1621 hasnoexecparents = False 1622 break 1623 if hasnoexecparents: 1624 valid_new.add(dep) 1625 1626 invalidtasks = set() 1627 for tid in self.rqdata.runtaskentries: 1628 if tid not in valid_new and tid not in noexec: 1629 invalidtasks.add(tid) 1630 1631 found = set() 1632 processed = set() 1633 for tid in invalidtasks: 1634 toprocess = set([tid]) 1635 while toprocess: 1636 next = set() 1637 for t in toprocess: 1638 for dep in self.rqdata.runtaskentries[t].depends: 1639 if dep in invalidtasks: 1640 found.add(tid) 1641 if dep not in processed: 1642 processed.add(dep) 1643 next.add(dep) 1644 toprocess = next 1645 if tid in found: 1646 toprocess = set() 1647 1648 tasklist = [] 1649 for tid in invalidtasks.difference(found): 1650 tasklist.append(tid) 1651 1652 if tasklist: 1653 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1654 1655 return invalidtasks.difference(found) 1656 1657 def write_diffscenetasks(self, invalidtasks): 1658 1659 # Define recursion callback 1660 def recursecb(key, hash1, hash2): 1661 hashes = [hash1, hash2] 1662 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1663 1664 recout = [] 1665 if len(hashfiles) == 2: 1666 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) 1667 recout.extend(list(' ' + l for l in out2)) 1668 else: 1669 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1670 1671 return recout 1672 1673 1674 for tid in invalidtasks: 1675 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1676 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1677 h = self.rqdata.runtaskentries[tid].hash 1678 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData) 1679 match = None 1680 for m in matches: 1681 if h in m: 1682 match = m 1683 if match is None: 1684 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) 1685 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1686 if matches: 1687 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] 1688 prevh = __find_sha256__.search(latestmatch).group(0) 1689 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1690 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1691 1692 1693class RunQueueExecute: 1694 1695 def __init__(self, rq): 1696 self.rq = rq 1697 self.cooker = rq.cooker 1698 self.cfgData = rq.cfgData 1699 self.rqdata = rq.rqdata 1700 1701 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1702 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1703 1704 self.sq_buildable = set() 1705 self.sq_running = set() 1706 self.sq_live = set() 1707 1708 self.updated_taskhash_queue = [] 1709 self.pending_migrations = set() 1710 1711 self.runq_buildable = set() 1712 self.runq_running = set() 1713 self.runq_complete = set() 1714 self.runq_tasksrun = set() 1715 1716 self.build_stamps = {} 1717 self.build_stamps2 = [] 1718 self.failed_tids = [] 1719 self.sq_deferred = {} 1720 1721 self.stampcache = {} 1722 1723 self.holdoff_tasks = set() 1724 self.holdoff_need_update = True 1725 self.sqdone = False 1726 1727 self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) 1728 self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids)) 1729 1730 for mc in rq.worker: 1731 rq.worker[mc].pipe.setrunqueueexec(self) 1732 for mc in rq.fakeworker: 1733 rq.fakeworker[mc].pipe.setrunqueueexec(self) 1734 1735 if self.number_tasks <= 0: 1736 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1737 1738 # List of setscene tasks which we've covered 1739 self.scenequeue_covered = set() 1740 # List of tasks which are covered (including setscene ones) 1741 self.tasks_covered = set() 1742 self.tasks_scenequeue_done = set() 1743 self.scenequeue_notcovered = set() 1744 self.tasks_notcovered = set() 1745 self.scenequeue_notneeded = set() 1746 1747 # We can't skip specified target tasks which aren't setscene tasks 1748 self.cantskip = set(self.rqdata.target_tids) 1749 self.cantskip.difference_update(self.rqdata.runq_setscene_tids) 1750 self.cantskip.intersection_update(self.rqdata.runtaskentries) 1751 1752 schedulers = self.get_schedulers() 1753 for scheduler in schedulers: 1754 if self.scheduler == scheduler.name: 1755 self.sched = scheduler(self, self.rqdata) 1756 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) 1757 break 1758 else: 1759 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1760 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1761 1762 #if len(self.rqdata.runq_setscene_tids) > 0: 1763 self.sqdata = SQData() 1764 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) 1765 1766 def runqueue_process_waitpid(self, task, status): 1767 1768 # self.build_stamps[pid] may not exist when use shared work directory. 1769 if task in self.build_stamps: 1770 self.build_stamps2.remove(self.build_stamps[task]) 1771 del self.build_stamps[task] 1772 1773 if task in self.sq_live: 1774 if status != 0: 1775 self.sq_task_fail(task, status) 1776 else: 1777 self.sq_task_complete(task) 1778 self.sq_live.remove(task) 1779 else: 1780 if status != 0: 1781 self.task_fail(task, status) 1782 else: 1783 self.task_complete(task) 1784 return True 1785 1786 def finish_now(self): 1787 for mc in self.rq.worker: 1788 try: 1789 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") 1790 self.rq.worker[mc].process.stdin.flush() 1791 except IOError: 1792 # worker must have died? 1793 pass 1794 for mc in self.rq.fakeworker: 1795 try: 1796 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") 1797 self.rq.fakeworker[mc].process.stdin.flush() 1798 except IOError: 1799 # worker must have died? 1800 pass 1801 1802 if len(self.failed_tids) != 0: 1803 self.rq.state = runQueueFailed 1804 return 1805 1806 self.rq.state = runQueueComplete 1807 return 1808 1809 def finish(self): 1810 self.rq.state = runQueueCleanUp 1811 1812 active = self.stats.active + self.sq_stats.active 1813 if active > 0: 1814 bb.event.fire(runQueueExitWait(active), self.cfgData) 1815 self.rq.read_workers() 1816 return self.rq.active_fds() 1817 1818 if len(self.failed_tids) != 0: 1819 self.rq.state = runQueueFailed 1820 return True 1821 1822 self.rq.state = runQueueComplete 1823 return True 1824 1825 # Used by setscene only 1826 def check_dependencies(self, task, taskdeps): 1827 if not self.rq.depvalidate: 1828 return False 1829 1830 # Must not edit parent data 1831 taskdeps = set(taskdeps) 1832 1833 taskdata = {} 1834 taskdeps.add(task) 1835 for dep in taskdeps: 1836 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 1837 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1838 taskdata[dep] = [pn, taskname, fn] 1839 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 1840 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 1841 valid = bb.utils.better_eval(call, locs) 1842 return valid 1843 1844 def can_start_task(self): 1845 active = self.stats.active + self.sq_stats.active 1846 can_start = active < self.number_tasks 1847 return can_start 1848 1849 def get_schedulers(self): 1850 schedulers = set(obj for obj in globals().values() 1851 if type(obj) is type and 1852 issubclass(obj, RunQueueScheduler)) 1853 1854 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 1855 if user_schedulers: 1856 for sched in user_schedulers.split(): 1857 if not "." in sched: 1858 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 1859 continue 1860 1861 modname, name = sched.rsplit(".", 1) 1862 try: 1863 module = __import__(modname, fromlist=(name,)) 1864 except ImportError as exc: 1865 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 1866 raise SystemExit(1) 1867 else: 1868 schedulers.add(getattr(module, name)) 1869 return schedulers 1870 1871 def setbuildable(self, task): 1872 self.runq_buildable.add(task) 1873 self.sched.newbuildable(task) 1874 1875 def task_completeoutright(self, task): 1876 """ 1877 Mark a task as completed 1878 Look at the reverse dependencies and mark any task with 1879 completed dependencies as buildable 1880 """ 1881 self.runq_complete.add(task) 1882 for revdep in self.rqdata.runtaskentries[task].revdeps: 1883 if revdep in self.runq_running: 1884 continue 1885 if revdep in self.runq_buildable: 1886 continue 1887 alldeps = True 1888 for dep in self.rqdata.runtaskentries[revdep].depends: 1889 if dep not in self.runq_complete: 1890 alldeps = False 1891 break 1892 if alldeps: 1893 self.setbuildable(revdep) 1894 logger.debug(1, "Marking task %s as buildable", revdep) 1895 1896 def task_complete(self, task): 1897 self.stats.taskCompleted() 1898 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 1899 self.task_completeoutright(task) 1900 self.runq_tasksrun.add(task) 1901 1902 def task_fail(self, task, exitcode): 1903 """ 1904 Called when a task has failed 1905 Updates the state engine with the failure 1906 """ 1907 self.stats.taskFailed() 1908 self.failed_tids.append(task) 1909 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData) 1910 if self.rqdata.taskData[''].abort: 1911 self.rq.state = runQueueCleanUp 1912 1913 def task_skip(self, task, reason): 1914 self.runq_running.add(task) 1915 self.setbuildable(task) 1916 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 1917 self.task_completeoutright(task) 1918 self.stats.taskSkipped() 1919 self.stats.taskCompleted() 1920 1921 def summarise_scenequeue_errors(self): 1922 err = False 1923 if not self.sqdone: 1924 logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 1925 completeevent = sceneQueueComplete(self.sq_stats, self.rq) 1926 bb.event.fire(completeevent, self.cfgData) 1927 if self.sq_deferred: 1928 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 1929 err = True 1930 if self.updated_taskhash_queue: 1931 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 1932 err = True 1933 if self.holdoff_tasks: 1934 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 1935 err = True 1936 1937 for tid in self.rqdata.runq_setscene_tids: 1938 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 1939 err = True 1940 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 1941 if tid not in self.sq_buildable: 1942 err = True 1943 logger.error("Setscene Task %s was never marked as buildable" % tid) 1944 if tid not in self.sq_running: 1945 err = True 1946 logger.error("Setscene Task %s was never marked as running" % tid) 1947 1948 for x in self.rqdata.runtaskentries: 1949 if x not in self.tasks_covered and x not in self.tasks_notcovered: 1950 logger.error("Task %s was never moved from the setscene queue" % x) 1951 err = True 1952 if x not in self.tasks_scenequeue_done: 1953 logger.error("Task %s was never processed by the setscene code" % x) 1954 err = True 1955 if len(self.rqdata.runtaskentries[x].depends) == 0 and x not in self.runq_buildable: 1956 logger.error("Task %s was never marked as buildable by the setscene code" % x) 1957 err = True 1958 return err 1959 1960 1961 def execute(self): 1962 """ 1963 Run the tasks in a queue prepared by prepare_runqueue 1964 """ 1965 1966 self.rq.read_workers() 1967 if self.updated_taskhash_queue or self.pending_migrations: 1968 self.process_possible_migrations() 1969 1970 if not hasattr(self, "sorted_setscene_tids"): 1971 # Don't want to sort this set every execution 1972 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 1973 1974 task = None 1975 if not self.sqdone and self.can_start_task(): 1976 # Find the next setscene to run 1977 for nexttask in self.sorted_setscene_tids: 1978 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): 1979 if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 1980 if nexttask not in self.rqdata.target_tids: 1981 logger.debug(2, "Skipping setscene for task %s" % nexttask) 1982 self.sq_task_skip(nexttask) 1983 self.scenequeue_notneeded.add(nexttask) 1984 if nexttask in self.sq_deferred: 1985 del self.sq_deferred[nexttask] 1986 return True 1987 # If covered tasks are running, need to wait for them to complete 1988 for t in self.sqdata.sq_covered_tasks[nexttask]: 1989 if t in self.runq_running and t not in self.runq_complete: 1990 continue 1991 if nexttask in self.sq_deferred: 1992 if self.sq_deferred[nexttask] not in self.runq_complete: 1993 continue 1994 logger.debug(1, "Task %s no longer deferred" % nexttask) 1995 del self.sq_deferred[nexttask] 1996 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 1997 if not valid: 1998 logger.debug(1, "%s didn't become valid, skipping setscene" % nexttask) 1999 self.sq_task_failoutright(nexttask) 2000 return True 2001 else: 2002 self.sqdata.outrightfail.remove(nexttask) 2003 if nexttask in self.sqdata.outrightfail: 2004 logger.debug(2, 'No package found, so skipping setscene task %s', nexttask) 2005 self.sq_task_failoutright(nexttask) 2006 return True 2007 if nexttask in self.sqdata.unskippable: 2008 logger.debug(2, "Setscene task %s is unskippable" % nexttask) 2009 task = nexttask 2010 break 2011 if task is not None: 2012 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2013 taskname = taskname + "_setscene" 2014 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2015 logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task) 2016 self.sq_task_failoutright(task) 2017 return True 2018 2019 if self.cooker.configuration.force: 2020 if task in self.rqdata.target_tids: 2021 self.sq_task_failoutright(task) 2022 return True 2023 2024 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2025 logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task) 2026 self.sq_task_skip(task) 2027 return True 2028 2029 if self.cooker.configuration.skipsetscene: 2030 logger.debug(2, 'No setscene tasks should be executed. Skipping %s', task) 2031 self.sq_task_failoutright(task) 2032 return True 2033 2034 startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq) 2035 bb.event.fire(startevent, self.cfgData) 2036 2037 taskdepdata = self.sq_build_taskdepdata(task) 2038 2039 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2040 taskhash = self.rqdata.get_task_hash(task) 2041 unihash = self.rqdata.get_task_unihash(task) 2042 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2043 if not mc in self.rq.fakeworker: 2044 self.rq.start_fakeworker(self, mc) 2045 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2046 self.rq.fakeworker[mc].process.stdin.flush() 2047 else: 2048 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2049 self.rq.worker[mc].process.stdin.flush() 2050 2051 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2052 self.build_stamps2.append(self.build_stamps[task]) 2053 self.sq_running.add(task) 2054 self.sq_live.add(task) 2055 self.sq_stats.taskActive() 2056 if self.can_start_task(): 2057 return True 2058 2059 self.update_holdofftasks() 2060 2061 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2062 hashequiv_logger.verbose("Setscene tasks completed") 2063 2064 err = self.summarise_scenequeue_errors() 2065 if err: 2066 self.rq.state = runQueueFailed 2067 return True 2068 2069 if self.cooker.configuration.setsceneonly: 2070 self.rq.state = runQueueComplete 2071 return True 2072 self.sqdone = True 2073 2074 if self.stats.total == 0: 2075 # nothing to do 2076 self.rq.state = runQueueComplete 2077 return True 2078 2079 if self.cooker.configuration.setsceneonly: 2080 task = None 2081 else: 2082 task = self.sched.next() 2083 if task is not None: 2084 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2085 2086 if self.rqdata.setscenewhitelist is not None: 2087 if self.check_setscenewhitelist(task): 2088 self.task_fail(task, "setscene whitelist") 2089 return True 2090 2091 if task in self.tasks_covered: 2092 logger.debug(2, "Setscene covered task %s", task) 2093 self.task_skip(task, "covered") 2094 return True 2095 2096 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2097 logger.debug(2, "Stamp current task %s", task) 2098 2099 self.task_skip(task, "existing") 2100 self.runq_tasksrun.add(task) 2101 return True 2102 2103 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2104 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2105 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2106 noexec=True) 2107 bb.event.fire(startevent, self.cfgData) 2108 self.runq_running.add(task) 2109 self.stats.taskActive() 2110 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2111 bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn) 2112 self.task_complete(task) 2113 return True 2114 else: 2115 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2116 bb.event.fire(startevent, self.cfgData) 2117 2118 taskdepdata = self.build_taskdepdata(task) 2119 2120 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2121 taskhash = self.rqdata.get_task_hash(task) 2122 unihash = self.rqdata.get_task_unihash(task) 2123 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2124 if not mc in self.rq.fakeworker: 2125 try: 2126 self.rq.start_fakeworker(self, mc) 2127 except OSError as exc: 2128 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2129 self.rq.state = runQueueFailed 2130 self.stats.taskFailed() 2131 return True 2132 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collection.get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2133 self.rq.fakeworker[mc].process.stdin.flush() 2134 else: 2135 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collection.get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2136 self.rq.worker[mc].process.stdin.flush() 2137 2138 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2139 self.build_stamps2.append(self.build_stamps[task]) 2140 self.runq_running.add(task) 2141 self.stats.taskActive() 2142 if self.can_start_task(): 2143 return True 2144 2145 if self.stats.active > 0 or self.sq_stats.active > 0: 2146 self.rq.read_workers() 2147 return self.rq.active_fds() 2148 2149 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2150 if self.sq_deferred: 2151 tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0]) 2152 logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid) 2153 self.sq_task_failoutright(tid) 2154 return True 2155 2156 if len(self.failed_tids) != 0: 2157 self.rq.state = runQueueFailed 2158 return True 2159 2160 # Sanity Checks 2161 err = self.summarise_scenequeue_errors() 2162 for task in self.rqdata.runtaskentries: 2163 if task not in self.runq_buildable: 2164 logger.error("Task %s never buildable!", task) 2165 err = True 2166 elif task not in self.runq_running: 2167 logger.error("Task %s never ran!", task) 2168 err = True 2169 elif task not in self.runq_complete: 2170 logger.error("Task %s never completed!", task) 2171 err = True 2172 2173 if err: 2174 self.rq.state = runQueueFailed 2175 else: 2176 self.rq.state = runQueueComplete 2177 2178 return True 2179 2180 def filtermcdeps(self, task, mc, deps): 2181 ret = set() 2182 for dep in deps: 2183 thismc = mc_from_tid(dep) 2184 if thismc != mc: 2185 continue 2186 ret.add(dep) 2187 return ret 2188 2189 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2190 # as most code can't handle them 2191 def build_taskdepdata(self, task): 2192 taskdepdata = {} 2193 mc = mc_from_tid(task) 2194 next = self.rqdata.runtaskentries[task].depends.copy() 2195 next.add(task) 2196 next = self.filtermcdeps(task, mc, next) 2197 while next: 2198 additional = [] 2199 for revdep in next: 2200 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2201 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2202 deps = self.rqdata.runtaskentries[revdep].depends 2203 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2204 taskhash = self.rqdata.runtaskentries[revdep].hash 2205 unihash = self.rqdata.runtaskentries[revdep].unihash 2206 deps = self.filtermcdeps(task, mc, deps) 2207 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2208 for revdep2 in deps: 2209 if revdep2 not in taskdepdata: 2210 additional.append(revdep2) 2211 next = additional 2212 2213 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2214 return taskdepdata 2215 2216 def update_holdofftasks(self): 2217 2218 if not self.holdoff_need_update: 2219 return 2220 2221 notcovered = set(self.scenequeue_notcovered) 2222 notcovered |= self.cantskip 2223 for tid in self.scenequeue_notcovered: 2224 notcovered |= self.sqdata.sq_covered_tasks[tid] 2225 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2226 notcovered.intersection_update(self.tasks_scenequeue_done) 2227 2228 covered = set(self.scenequeue_covered) 2229 for tid in self.scenequeue_covered: 2230 covered |= self.sqdata.sq_covered_tasks[tid] 2231 covered.difference_update(notcovered) 2232 covered.intersection_update(self.tasks_scenequeue_done) 2233 2234 for tid in notcovered | covered: 2235 if len(self.rqdata.runtaskentries[tid].depends) == 0: 2236 self.setbuildable(tid) 2237 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2238 self.setbuildable(tid) 2239 2240 self.tasks_covered = covered 2241 self.tasks_notcovered = notcovered 2242 2243 self.holdoff_tasks = set() 2244 2245 for tid in self.rqdata.runq_setscene_tids: 2246 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2247 self.holdoff_tasks.add(tid) 2248 2249 for tid in self.holdoff_tasks.copy(): 2250 for dep in self.sqdata.sq_covered_tasks[tid]: 2251 if dep not in self.runq_complete: 2252 self.holdoff_tasks.add(dep) 2253 2254 self.holdoff_need_update = False 2255 2256 def process_possible_migrations(self): 2257 2258 changed = set() 2259 toprocess = set() 2260 for tid, unihash in self.updated_taskhash_queue.copy(): 2261 if tid in self.runq_running and tid not in self.runq_complete: 2262 continue 2263 2264 self.updated_taskhash_queue.remove((tid, unihash)) 2265 2266 if unihash != self.rqdata.runtaskentries[tid].unihash: 2267 hashequiv_logger.verbose("Task %s unihash changed to %s" % (tid, unihash)) 2268 self.rqdata.runtaskentries[tid].unihash = unihash 2269 bb.parse.siggen.set_unihash(tid, unihash) 2270 toprocess.add(tid) 2271 2272 # Work out all tasks which depend upon these 2273 total = set() 2274 next = set() 2275 for p in toprocess: 2276 next |= self.rqdata.runtaskentries[p].revdeps 2277 while next: 2278 current = next.copy() 2279 total = total | next 2280 next = set() 2281 for ntid in current: 2282 next |= self.rqdata.runtaskentries[ntid].revdeps 2283 next.difference_update(total) 2284 2285 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2286 next = set() 2287 for p in total: 2288 if len(self.rqdata.runtaskentries[p].depends) == 0: 2289 next.add(p) 2290 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2291 next.add(p) 2292 2293 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2294 while next: 2295 current = next.copy() 2296 next = set() 2297 for tid in current: 2298 if len(self.rqdata.runtaskentries[p].depends) and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2299 continue 2300 orighash = self.rqdata.runtaskentries[tid].hash 2301 newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches[mc_from_tid(tid)]) 2302 origuni = self.rqdata.runtaskentries[tid].unihash 2303 newuni = bb.parse.siggen.get_unihash(tid) 2304 # FIXME, need to check it can come from sstate at all for determinism? 2305 remapped = False 2306 if newuni == origuni: 2307 # Nothing to do, we match, skip code below 2308 remapped = True 2309 elif tid in self.scenequeue_covered or tid in self.sq_live: 2310 # Already ran this setscene task or it running. Report the new taskhash 2311 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2312 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2313 remapped = True 2314 2315 if not remapped: 2316 #logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2317 self.rqdata.runtaskentries[tid].hash = newhash 2318 self.rqdata.runtaskentries[tid].unihash = newuni 2319 changed.add(tid) 2320 2321 next |= self.rqdata.runtaskentries[tid].revdeps 2322 total.remove(tid) 2323 next.intersection_update(total) 2324 2325 if changed: 2326 for mc in self.rq.worker: 2327 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2328 for mc in self.rq.fakeworker: 2329 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2330 2331 hashequiv_logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) 2332 2333 for tid in changed: 2334 if tid not in self.rqdata.runq_setscene_tids: 2335 continue 2336 if tid not in self.pending_migrations: 2337 self.pending_migrations.add(tid) 2338 2339 update_tasks = [] 2340 for tid in self.pending_migrations.copy(): 2341 if tid in self.runq_running or tid in self.sq_live: 2342 # Too late, task already running, not much we can do now 2343 self.pending_migrations.remove(tid) 2344 continue 2345 2346 valid = True 2347 # Check no tasks this covers are running 2348 for dep in self.sqdata.sq_covered_tasks[tid]: 2349 if dep in self.runq_running and dep not in self.runq_complete: 2350 hashequiv_logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2351 valid = False 2352 break 2353 if not valid: 2354 continue 2355 2356 self.pending_migrations.remove(tid) 2357 changed = True 2358 2359 if tid in self.tasks_scenequeue_done: 2360 self.tasks_scenequeue_done.remove(tid) 2361 for dep in self.sqdata.sq_covered_tasks[tid]: 2362 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2363 bb.error("Task %s marked as completed but now needing to rerun? Aborting build." % dep) 2364 self.failed_tids.append(tid) 2365 self.rq.state = runQueueCleanUp 2366 return 2367 2368 if dep not in self.runq_complete: 2369 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2370 self.tasks_scenequeue_done.remove(dep) 2371 2372 if tid in self.sq_buildable: 2373 self.sq_buildable.remove(tid) 2374 if tid in self.sq_running: 2375 self.sq_running.remove(tid) 2376 harddepfail = False 2377 for t in self.sqdata.sq_harddeps: 2378 if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered: 2379 harddepfail = True 2380 break 2381 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2382 if tid not in self.sq_buildable: 2383 self.sq_buildable.add(tid) 2384 if len(self.sqdata.sq_revdeps[tid]) == 0: 2385 self.sq_buildable.add(tid) 2386 2387 if tid in self.sqdata.outrightfail: 2388 self.sqdata.outrightfail.remove(tid) 2389 if tid in self.scenequeue_notcovered: 2390 self.scenequeue_notcovered.remove(tid) 2391 if tid in self.scenequeue_covered: 2392 self.scenequeue_covered.remove(tid) 2393 if tid in self.scenequeue_notneeded: 2394 self.scenequeue_notneeded.remove(tid) 2395 2396 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2397 self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) 2398 2399 if tid in self.stampcache: 2400 del self.stampcache[tid] 2401 2402 if tid in self.build_stamps: 2403 del self.build_stamps[tid] 2404 2405 update_tasks.append((tid, harddepfail, tid in self.sqdata.valid)) 2406 2407 if update_tasks: 2408 self.sqdone = False 2409 update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2410 2411 for (tid, harddepfail, origvalid) in update_tasks: 2412 if tid in self.sqdata.valid and not origvalid: 2413 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2414 if harddepfail: 2415 self.sq_task_failoutright(tid) 2416 2417 if changed: 2418 self.holdoff_need_update = True 2419 2420 def scenequeue_updatecounters(self, task, fail=False): 2421 2422 for dep in sorted(self.sqdata.sq_deps[task]): 2423 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: 2424 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2425 self.sq_task_failoutright(dep) 2426 continue 2427 if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2428 if dep not in self.sq_buildable: 2429 self.sq_buildable.add(dep) 2430 2431 next = set([task]) 2432 while next: 2433 new = set() 2434 for t in sorted(next): 2435 self.tasks_scenequeue_done.add(t) 2436 # Look down the dependency chain for non-setscene things which this task depends on 2437 # and mark as 'done' 2438 for dep in self.rqdata.runtaskentries[t].depends: 2439 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2440 continue 2441 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2442 new.add(dep) 2443 next = new 2444 2445 self.holdoff_need_update = True 2446 2447 def sq_task_completeoutright(self, task): 2448 """ 2449 Mark a task as completed 2450 Look at the reverse dependencies and mark any task with 2451 completed dependencies as buildable 2452 """ 2453 2454 logger.debug(1, 'Found task %s which could be accelerated', task) 2455 self.scenequeue_covered.add(task) 2456 self.scenequeue_updatecounters(task) 2457 2458 def sq_check_taskfail(self, task): 2459 if self.rqdata.setscenewhitelist is not None: 2460 realtask = task.split('_setscene')[0] 2461 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2462 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2463 if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): 2464 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2465 self.rq.state = runQueueCleanUp 2466 2467 def sq_task_complete(self, task): 2468 self.sq_stats.taskCompleted() 2469 bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData) 2470 self.sq_task_completeoutright(task) 2471 2472 def sq_task_fail(self, task, result): 2473 self.sq_stats.taskFailed() 2474 bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) 2475 self.scenequeue_notcovered.add(task) 2476 self.scenequeue_updatecounters(task, True) 2477 self.sq_check_taskfail(task) 2478 2479 def sq_task_failoutright(self, task): 2480 self.sq_running.add(task) 2481 self.sq_buildable.add(task) 2482 self.sq_stats.taskSkipped() 2483 self.sq_stats.taskCompleted() 2484 self.scenequeue_notcovered.add(task) 2485 self.scenequeue_updatecounters(task, True) 2486 2487 def sq_task_skip(self, task): 2488 self.sq_running.add(task) 2489 self.sq_buildable.add(task) 2490 self.sq_task_completeoutright(task) 2491 self.sq_stats.taskSkipped() 2492 self.sq_stats.taskCompleted() 2493 2494 def sq_build_taskdepdata(self, task): 2495 def getsetscenedeps(tid): 2496 deps = set() 2497 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2498 realtid = tid + "_setscene" 2499 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2500 for (depname, idependtask) in idepends: 2501 if depname not in self.rqdata.taskData[mc].build_targets: 2502 continue 2503 2504 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2505 if depfn is None: 2506 continue 2507 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2508 deps.add(deptid) 2509 return deps 2510 2511 taskdepdata = {} 2512 next = getsetscenedeps(task) 2513 next.add(task) 2514 while next: 2515 additional = [] 2516 for revdep in next: 2517 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2518 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2519 deps = getsetscenedeps(revdep) 2520 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2521 taskhash = self.rqdata.runtaskentries[revdep].hash 2522 unihash = self.rqdata.runtaskentries[revdep].unihash 2523 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2524 for revdep2 in deps: 2525 if revdep2 not in taskdepdata: 2526 additional.append(revdep2) 2527 next = additional 2528 2529 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2530 return taskdepdata 2531 2532 def check_setscenewhitelist(self, tid): 2533 # Check task that is going to run against the whitelist 2534 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2535 # Ignore covered tasks 2536 if tid in self.tasks_covered: 2537 return False 2538 # Ignore stamped tasks 2539 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2540 return False 2541 # Ignore noexec tasks 2542 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2543 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2544 return False 2545 2546 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2547 if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): 2548 if tid in self.rqdata.runq_setscene_tids: 2549 msg = 'Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname) 2550 else: 2551 msg = 'Task %s.%s attempted to execute unexpectedly' % (pn, taskname) 2552 for t in self.scenequeue_notcovered: 2553 msg = msg + "\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash) 2554 logger.error(msg + '\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2555 return True 2556 return False 2557 2558class SQData(object): 2559 def __init__(self): 2560 # SceneQueue dependencies 2561 self.sq_deps = {} 2562 # SceneQueue reverse dependencies 2563 self.sq_revdeps = {} 2564 # Injected inter-setscene task dependencies 2565 self.sq_harddeps = {} 2566 # Cache of stamp files so duplicates can't run in parallel 2567 self.stamps = {} 2568 # Setscene tasks directly depended upon by the build 2569 self.unskippable = set() 2570 # List of setscene tasks which aren't present 2571 self.outrightfail = set() 2572 # A list of normal tasks a setscene task covers 2573 self.sq_covered_tasks = {} 2574 2575def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): 2576 2577 sq_revdeps = {} 2578 sq_revdeps_squash = {} 2579 sq_collated_deps = {} 2580 2581 # We need to construct a dependency graph for the setscene functions. Intermediate 2582 # dependencies between the setscene tasks only complicate the code. This code 2583 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2584 # only containing the setscene functions. 2585 2586 rqdata.init_progress_reporter.next_stage() 2587 2588 # First process the chains up to the first setscene task. 2589 endpoints = {} 2590 for tid in rqdata.runtaskentries: 2591 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2592 sq_revdeps_squash[tid] = set() 2593 if (len(sq_revdeps[tid]) == 0) and tid not in rqdata.runq_setscene_tids: 2594 #bb.warn("Added endpoint %s" % (tid)) 2595 endpoints[tid] = set() 2596 2597 rqdata.init_progress_reporter.next_stage() 2598 2599 # Secondly process the chains between setscene tasks. 2600 for tid in rqdata.runq_setscene_tids: 2601 sq_collated_deps[tid] = set() 2602 #bb.warn("Added endpoint 2 %s" % (tid)) 2603 for dep in rqdata.runtaskentries[tid].depends: 2604 if tid in sq_revdeps[dep]: 2605 sq_revdeps[dep].remove(tid) 2606 if dep not in endpoints: 2607 endpoints[dep] = set() 2608 #bb.warn(" Added endpoint 3 %s" % (dep)) 2609 endpoints[dep].add(tid) 2610 2611 rqdata.init_progress_reporter.next_stage() 2612 2613 def process_endpoints(endpoints): 2614 newendpoints = {} 2615 for point, task in endpoints.items(): 2616 tasks = set() 2617 if task: 2618 tasks |= task 2619 if sq_revdeps_squash[point]: 2620 tasks |= sq_revdeps_squash[point] 2621 if point not in rqdata.runq_setscene_tids: 2622 for t in tasks: 2623 sq_collated_deps[t].add(point) 2624 sq_revdeps_squash[point] = set() 2625 if point in rqdata.runq_setscene_tids: 2626 sq_revdeps_squash[point] = tasks 2627 tasks = set() 2628 continue 2629 for dep in rqdata.runtaskentries[point].depends: 2630 if point in sq_revdeps[dep]: 2631 sq_revdeps[dep].remove(point) 2632 if tasks: 2633 sq_revdeps_squash[dep] |= tasks 2634 if len(sq_revdeps[dep]) == 0 and dep not in rqdata.runq_setscene_tids: 2635 newendpoints[dep] = task 2636 if len(newendpoints) != 0: 2637 process_endpoints(newendpoints) 2638 2639 process_endpoints(endpoints) 2640 2641 rqdata.init_progress_reporter.next_stage() 2642 2643 # Build a list of tasks which are "unskippable" 2644 # These are direct endpoints referenced by the build upto and including setscene tasks 2645 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2646 new = True 2647 for tid in rqdata.runtaskentries: 2648 if len(rqdata.runtaskentries[tid].revdeps) == 0: 2649 sqdata.unskippable.add(tid) 2650 sqdata.unskippable |= sqrq.cantskip 2651 while new: 2652 new = False 2653 orig = sqdata.unskippable.copy() 2654 for tid in sorted(orig, reverse=True): 2655 if tid in rqdata.runq_setscene_tids: 2656 continue 2657 if len(rqdata.runtaskentries[tid].depends) == 0: 2658 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2659 sqrq.setbuildable(tid) 2660 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2661 if sqdata.unskippable != orig: 2662 new = True 2663 2664 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 2665 2666 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2667 2668 # Sanity check all dependencies could be changed to setscene task references 2669 for taskcounter, tid in enumerate(rqdata.runtaskentries): 2670 if tid in rqdata.runq_setscene_tids: 2671 pass 2672 elif len(sq_revdeps_squash[tid]) != 0: 2673 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") 2674 else: 2675 del sq_revdeps_squash[tid] 2676 rqdata.init_progress_reporter.update(taskcounter) 2677 2678 rqdata.init_progress_reporter.next_stage() 2679 2680 # Resolve setscene inter-task dependencies 2681 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 2682 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 2683 for tid in rqdata.runq_setscene_tids: 2684 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2685 realtid = tid + "_setscene" 2686 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 2687 sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True) 2688 for (depname, idependtask) in idepends: 2689 2690 if depname not in rqdata.taskData[mc].build_targets: 2691 continue 2692 2693 depfn = rqdata.taskData[mc].build_targets[depname][0] 2694 if depfn is None: 2695 continue 2696 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2697 if deptid not in rqdata.runtaskentries: 2698 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 2699 2700 if not deptid in sqdata.sq_harddeps: 2701 sqdata.sq_harddeps[deptid] = set() 2702 sqdata.sq_harddeps[deptid].add(tid) 2703 2704 sq_revdeps_squash[tid].add(deptid) 2705 # Have to zero this to avoid circular dependencies 2706 sq_revdeps_squash[deptid] = set() 2707 2708 rqdata.init_progress_reporter.next_stage() 2709 2710 for task in sqdata.sq_harddeps: 2711 for dep in sqdata.sq_harddeps[task]: 2712 sq_revdeps_squash[dep].add(task) 2713 2714 rqdata.init_progress_reporter.next_stage() 2715 2716 #for tid in sq_revdeps_squash: 2717 # data = "" 2718 # for dep in sq_revdeps_squash[tid]: 2719 # data = data + "\n %s" % dep 2720 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 2721 2722 sqdata.sq_revdeps = sq_revdeps_squash 2723 sqdata.sq_covered_tasks = sq_collated_deps 2724 2725 # Build reverse version of revdeps to populate deps structure 2726 for tid in sqdata.sq_revdeps: 2727 sqdata.sq_deps[tid] = set() 2728 for tid in sqdata.sq_revdeps: 2729 for dep in sqdata.sq_revdeps[tid]: 2730 sqdata.sq_deps[dep].add(tid) 2731 2732 rqdata.init_progress_reporter.next_stage() 2733 2734 sqdata.multiconfigs = set() 2735 for tid in sqdata.sq_revdeps: 2736 sqdata.multiconfigs.add(mc_from_tid(tid)) 2737 if len(sqdata.sq_revdeps[tid]) == 0: 2738 sqrq.sq_buildable.add(tid) 2739 2740 rqdata.init_progress_reporter.finish() 2741 2742 sqdata.noexec = set() 2743 sqdata.stamppresent = set() 2744 sqdata.valid = set() 2745 2746 update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) 2747 2748def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 2749 2750 tocheck = set() 2751 2752 for tid in sorted(tids): 2753 if tid in sqdata.stamppresent: 2754 sqdata.stamppresent.remove(tid) 2755 if tid in sqdata.valid: 2756 sqdata.valid.remove(tid) 2757 2758 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2759 2760 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 2761 2762 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2763 sqdata.noexec.add(tid) 2764 sqrq.sq_task_skip(tid) 2765 bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn) 2766 continue 2767 2768 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 2769 logger.debug(2, 'Setscene stamp current for task %s', tid) 2770 sqdata.stamppresent.add(tid) 2771 sqrq.sq_task_skip(tid) 2772 continue 2773 2774 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 2775 logger.debug(2, 'Normal stamp current for task %s', tid) 2776 sqdata.stamppresent.add(tid) 2777 sqrq.sq_task_skip(tid) 2778 continue 2779 2780 tocheck.add(tid) 2781 2782 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 2783 2784 sqdata.hashes = {} 2785 for mc in sorted(sqdata.multiconfigs): 2786 for tid in sorted(sqdata.sq_revdeps): 2787 if mc_from_tid(tid) != mc: 2788 continue 2789 if tid in sqdata.stamppresent: 2790 continue 2791 if tid in sqdata.valid: 2792 continue 2793 if tid in sqdata.noexec: 2794 continue 2795 if tid in sqrq.scenequeue_notcovered: 2796 continue 2797 sqdata.outrightfail.add(tid) 2798 2799 h = pending_hash_index(tid, rqdata) 2800 if h not in sqdata.hashes: 2801 sqdata.hashes[h] = tid 2802 else: 2803 sqrq.sq_deferred[tid] = sqdata.hashes[h] 2804 bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) 2805 2806 2807class TaskFailure(Exception): 2808 """ 2809 Exception raised when a task in a runqueue fails 2810 """ 2811 def __init__(self, x): 2812 self.args = x 2813 2814 2815class runQueueExitWait(bb.event.Event): 2816 """ 2817 Event when waiting for task processes to exit 2818 """ 2819 2820 def __init__(self, remain): 2821 self.remain = remain 2822 self.message = "Waiting for %s active tasks to finish" % remain 2823 bb.event.Event.__init__(self) 2824 2825class runQueueEvent(bb.event.Event): 2826 """ 2827 Base runQueue event class 2828 """ 2829 def __init__(self, task, stats, rq): 2830 self.taskid = task 2831 self.taskstring = task 2832 self.taskname = taskname_from_tid(task) 2833 self.taskfile = fn_from_tid(task) 2834 self.taskhash = rq.rqdata.get_task_hash(task) 2835 self.stats = stats.copy() 2836 bb.event.Event.__init__(self) 2837 2838class sceneQueueEvent(runQueueEvent): 2839 """ 2840 Base sceneQueue event class 2841 """ 2842 def __init__(self, task, stats, rq, noexec=False): 2843 runQueueEvent.__init__(self, task, stats, rq) 2844 self.taskstring = task + "_setscene" 2845 self.taskname = taskname_from_tid(task) + "_setscene" 2846 self.taskfile = fn_from_tid(task) 2847 self.taskhash = rq.rqdata.get_task_hash(task) 2848 2849class runQueueTaskStarted(runQueueEvent): 2850 """ 2851 Event notifying a task was started 2852 """ 2853 def __init__(self, task, stats, rq, noexec=False): 2854 runQueueEvent.__init__(self, task, stats, rq) 2855 self.noexec = noexec 2856 2857class sceneQueueTaskStarted(sceneQueueEvent): 2858 """ 2859 Event notifying a setscene task was started 2860 """ 2861 def __init__(self, task, stats, rq, noexec=False): 2862 sceneQueueEvent.__init__(self, task, stats, rq) 2863 self.noexec = noexec 2864 2865class runQueueTaskFailed(runQueueEvent): 2866 """ 2867 Event notifying a task failed 2868 """ 2869 def __init__(self, task, stats, exitcode, rq): 2870 runQueueEvent.__init__(self, task, stats, rq) 2871 self.exitcode = exitcode 2872 2873 def __str__(self): 2874 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 2875 2876class sceneQueueTaskFailed(sceneQueueEvent): 2877 """ 2878 Event notifying a setscene task failed 2879 """ 2880 def __init__(self, task, stats, exitcode, rq): 2881 sceneQueueEvent.__init__(self, task, stats, rq) 2882 self.exitcode = exitcode 2883 2884 def __str__(self): 2885 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 2886 2887class sceneQueueComplete(sceneQueueEvent): 2888 """ 2889 Event when all the sceneQueue tasks are complete 2890 """ 2891 def __init__(self, stats, rq): 2892 self.stats = stats.copy() 2893 bb.event.Event.__init__(self) 2894 2895class runQueueTaskCompleted(runQueueEvent): 2896 """ 2897 Event notifying a task completed 2898 """ 2899 2900class sceneQueueTaskCompleted(sceneQueueEvent): 2901 """ 2902 Event notifying a setscene task completed 2903 """ 2904 2905class runQueueTaskSkipped(runQueueEvent): 2906 """ 2907 Event notifying a task was skipped 2908 """ 2909 def __init__(self, task, stats, rq, reason): 2910 runQueueEvent.__init__(self, task, stats, rq) 2911 self.reason = reason 2912 2913class taskUniHashUpdate(bb.event.Event): 2914 """ 2915 Base runQueue event class 2916 """ 2917 def __init__(self, task, unihash): 2918 self.taskid = task 2919 self.unihash = unihash 2920 bb.event.Event.__init__(self) 2921 2922class runQueuePipe(): 2923 """ 2924 Abstraction for a pipe between a worker thread and the server 2925 """ 2926 def __init__(self, pipein, pipeout, d, rq, rqexec): 2927 self.input = pipein 2928 if pipeout: 2929 pipeout.close() 2930 bb.utils.nonblockingfd(self.input) 2931 self.queue = b"" 2932 self.d = d 2933 self.rq = rq 2934 self.rqexec = rqexec 2935 2936 def setrunqueueexec(self, rqexec): 2937 self.rqexec = rqexec 2938 2939 def read(self): 2940 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 2941 for worker in workers.values(): 2942 worker.process.poll() 2943 if worker.process.returncode is not None and not self.rq.teardown: 2944 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 2945 self.rq.finish_runqueue(True) 2946 2947 start = len(self.queue) 2948 try: 2949 self.queue = self.queue + (self.input.read(102400) or b"") 2950 except (OSError, IOError) as e: 2951 if e.errno != errno.EAGAIN: 2952 raise 2953 end = len(self.queue) 2954 found = True 2955 while found and len(self.queue): 2956 found = False 2957 index = self.queue.find(b"</event>") 2958 while index != -1 and self.queue.startswith(b"<event>"): 2959 try: 2960 event = pickle.loads(self.queue[7:index]) 2961 except ValueError as e: 2962 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 2963 bb.event.fire_from_worker(event, self.d) 2964 if isinstance(event, taskUniHashUpdate): 2965 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 2966 found = True 2967 self.queue = self.queue[index+8:] 2968 index = self.queue.find(b"</event>") 2969 index = self.queue.find(b"</exitcode>") 2970 while index != -1 and self.queue.startswith(b"<exitcode>"): 2971 try: 2972 task, status = pickle.loads(self.queue[10:index]) 2973 except ValueError as e: 2974 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 2975 self.rqexec.runqueue_process_waitpid(task, status) 2976 found = True 2977 self.queue = self.queue[index+11:] 2978 index = self.queue.find(b"</exitcode>") 2979 return (end > start) 2980 2981 def close(self): 2982 while self.read(): 2983 continue 2984 if len(self.queue) > 0: 2985 print("Warning, worker left partial message: %s" % self.queue) 2986 self.input.close() 2987 2988def get_setscene_enforce_whitelist(d): 2989 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 2990 return None 2991 whitelist = (d.getVar("BB_SETSCENE_ENFORCE_WHITELIST") or "").split() 2992 outlist = [] 2993 for item in whitelist[:]: 2994 if item.startswith('%:'): 2995 for target in sys.argv[1:]: 2996 if not target.startswith('-'): 2997 outlist.append(target.split(':')[0] + ':' + item.split(':')[1]) 2998 else: 2999 outlist.append(item) 3000 return outlist 3001 3002def check_setscene_enforce_whitelist(pn, taskname, whitelist): 3003 import fnmatch 3004 if whitelist is not None: 3005 item = '%s:%s' % (pn, taskname) 3006 for whitelist_item in whitelist: 3007 if fnmatch.fnmatch(item, whitelist_item): 3008 return True 3009 return False 3010 return True 3011