1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import logging 18import re 19import bb 20from bb import msg, event 21from bb import monitordisk 22import subprocess 23import pickle 24from multiprocessing import Process 25import shlex 26import pprint 27 28bblogger = logging.getLogger("BitBake") 29logger = logging.getLogger("BitBake.RunQueue") 30hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 31 32__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 33 34def fn_from_tid(tid): 35 return tid.rsplit(":", 1)[0] 36 37def taskname_from_tid(tid): 38 return tid.rsplit(":", 1)[1] 39 40def mc_from_tid(tid): 41 if tid.startswith('mc:') and tid.count(':') >= 2: 42 return tid.split(':')[1] 43 return "" 44 45def split_tid(tid): 46 (mc, fn, taskname, _) = split_tid_mcfn(tid) 47 return (mc, fn, taskname) 48 49def split_mc(n): 50 if n.startswith("mc:") and n.count(':') >= 2: 51 _, mc, n = n.split(":", 2) 52 return (mc, n) 53 return ('', n) 54 55def split_tid_mcfn(tid): 56 if tid.startswith('mc:') and tid.count(':') >= 2: 57 elems = tid.split(':') 58 mc = elems[1] 59 fn = ":".join(elems[2:-1]) 60 taskname = elems[-1] 61 mcfn = "mc:" + mc + ":" + fn 62 else: 63 tid = tid.rsplit(":", 1) 64 mc = "" 65 fn = tid[0] 66 taskname = tid[1] 67 mcfn = fn 68 69 return (mc, fn, taskname, mcfn) 70 71def build_tid(mc, fn, taskname): 72 if mc: 73 return "mc:" + mc + ":" + fn + ":" + taskname 74 return fn + ":" + taskname 75 76# Index used to pair up potentially matching multiconfig tasks 77# We match on PN, taskname and hash being equal 78def pending_hash_index(tid, rqdata): 79 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 80 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 81 h = rqdata.runtaskentries[tid].unihash 82 return pn + ":" + "taskname" + h 83 84class RunQueueStats: 85 """ 86 Holds statistics on the tasks handled by the associated runQueue 87 """ 88 def __init__(self, total, setscene_total): 89 self.completed = 0 90 self.skipped = 0 91 self.failed = 0 92 self.active = 0 93 self.setscene_active = 0 94 self.setscene_covered = 0 95 self.setscene_notcovered = 0 96 self.setscene_total = setscene_total 97 self.total = total 98 99 def copy(self): 100 obj = self.__class__(self.total, self.setscene_total) 101 obj.__dict__.update(self.__dict__) 102 return obj 103 104 def taskFailed(self): 105 self.active = self.active - 1 106 self.failed = self.failed + 1 107 108 def taskCompleted(self): 109 self.active = self.active - 1 110 self.completed = self.completed + 1 111 112 def taskSkipped(self): 113 self.active = self.active + 1 114 self.skipped = self.skipped + 1 115 116 def taskActive(self): 117 self.active = self.active + 1 118 119 def updateCovered(self, covered, notcovered): 120 self.setscene_covered = covered 121 self.setscene_notcovered = notcovered 122 123 def updateActiveSetscene(self, active): 124 self.setscene_active = active 125 126# These values indicate the next step due to be run in the 127# runQueue state machine 128runQueuePrepare = 2 129runQueueSceneInit = 3 130runQueueRunning = 6 131runQueueFailed = 7 132runQueueCleanUp = 8 133runQueueComplete = 9 134 135class RunQueueScheduler(object): 136 """ 137 Control the order tasks are scheduled in. 138 """ 139 name = "basic" 140 141 def __init__(self, runqueue, rqdata): 142 """ 143 The default scheduler just returns the first buildable task (the 144 priority map is sorted by task number) 145 """ 146 self.rq = runqueue 147 self.rqdata = rqdata 148 self.numTasks = len(self.rqdata.runtaskentries) 149 150 self.prio_map = [self.rqdata.runtaskentries.keys()] 151 152 self.buildable = set() 153 self.skip_maxthread = {} 154 self.stamps = {} 155 for tid in self.rqdata.runtaskentries: 156 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 157 self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 158 if tid in self.rq.runq_buildable: 159 self.buildable.append(tid) 160 161 self.rev_prio_map = None 162 163 def next_buildable_task(self): 164 """ 165 Return the id of the first task we find that is buildable 166 """ 167 # Once tasks are running we don't need to worry about them again 168 self.buildable.difference_update(self.rq.runq_running) 169 buildable = set(self.buildable) 170 buildable.difference_update(self.rq.holdoff_tasks) 171 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 172 if not buildable: 173 return None 174 175 # Filter out tasks that have a max number of threads that have been exceeded 176 skip_buildable = {} 177 for running in self.rq.runq_running.difference(self.rq.runq_complete): 178 rtaskname = taskname_from_tid(running) 179 if rtaskname not in self.skip_maxthread: 180 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 181 if not self.skip_maxthread[rtaskname]: 182 continue 183 if rtaskname in skip_buildable: 184 skip_buildable[rtaskname] += 1 185 else: 186 skip_buildable[rtaskname] = 1 187 188 if len(buildable) == 1: 189 tid = buildable.pop() 190 taskname = taskname_from_tid(tid) 191 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 192 return None 193 stamp = self.stamps[tid] 194 if stamp not in self.rq.build_stamps.values(): 195 return tid 196 197 if not self.rev_prio_map: 198 self.rev_prio_map = {} 199 for tid in self.rqdata.runtaskentries: 200 self.rev_prio_map[tid] = self.prio_map.index(tid) 201 202 best = None 203 bestprio = None 204 for tid in buildable: 205 taskname = taskname_from_tid(tid) 206 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 207 continue 208 prio = self.rev_prio_map[tid] 209 if bestprio is None or bestprio > prio: 210 stamp = self.stamps[tid] 211 if stamp in self.rq.build_stamps.values(): 212 continue 213 bestprio = prio 214 best = tid 215 216 return best 217 218 def next(self): 219 """ 220 Return the id of the task we should build next 221 """ 222 if self.rq.can_start_task(): 223 return self.next_buildable_task() 224 225 def newbuildable(self, task): 226 self.buildable.add(task) 227 228 def removebuildable(self, task): 229 self.buildable.remove(task) 230 231 def describe_task(self, taskid): 232 result = 'ID %s' % taskid 233 if self.rev_prio_map: 234 result = result + (' pri %d' % self.rev_prio_map[taskid]) 235 return result 236 237 def dump_prio(self, comment): 238 bb.debug(3, '%s (most important first):\n%s' % 239 (comment, 240 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 241 index, taskid in enumerate(self.prio_map)]))) 242 243class RunQueueSchedulerSpeed(RunQueueScheduler): 244 """ 245 A scheduler optimised for speed. The priority map is sorted by task weight, 246 heavier weighted tasks (tasks needed by the most other tasks) are run first. 247 """ 248 name = "speed" 249 250 def __init__(self, runqueue, rqdata): 251 """ 252 The priority map is sorted by task weight. 253 """ 254 RunQueueScheduler.__init__(self, runqueue, rqdata) 255 256 weights = {} 257 for tid in self.rqdata.runtaskentries: 258 weight = self.rqdata.runtaskentries[tid].weight 259 if not weight in weights: 260 weights[weight] = [] 261 weights[weight].append(tid) 262 263 self.prio_map = [] 264 for weight in sorted(weights): 265 for w in weights[weight]: 266 self.prio_map.append(w) 267 268 self.prio_map.reverse() 269 270class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 271 """ 272 A scheduler optimised to complete .bb files as quickly as possible. The 273 priority map is sorted by task weight, but then reordered so once a given 274 .bb file starts to build, it's completed as quickly as possible by 275 running all tasks related to the same .bb file one after the after. 276 This works well where disk space is at a premium and classes like OE's 277 rm_work are in force. 278 """ 279 name = "completion" 280 281 def __init__(self, runqueue, rqdata): 282 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 283 284 # Extract list of tasks for each recipe, with tasks sorted 285 # ascending from "must run first" (typically do_fetch) to 286 # "runs last" (do_build). The speed scheduler prioritizes 287 # tasks that must run first before the ones that run later; 288 # this is what we depend on here. 289 task_lists = {} 290 for taskid in self.prio_map: 291 fn, taskname = taskid.rsplit(':', 1) 292 task_lists.setdefault(fn, []).append(taskname) 293 294 # Now unify the different task lists. The strategy is that 295 # common tasks get skipped and new ones get inserted after the 296 # preceeding common one(s) as they are found. Because task 297 # lists should differ only by their number of tasks, but not 298 # the ordering of the common tasks, this should result in a 299 # deterministic result that is a superset of the individual 300 # task ordering. 301 all_tasks = [] 302 for recipe, new_tasks in task_lists.items(): 303 index = 0 304 old_task = all_tasks[index] if index < len(all_tasks) else None 305 for new_task in new_tasks: 306 if old_task == new_task: 307 # Common task, skip it. This is the fast-path which 308 # avoids a full search. 309 index += 1 310 old_task = all_tasks[index] if index < len(all_tasks) else None 311 else: 312 try: 313 index = all_tasks.index(new_task) 314 # Already present, just not at the current 315 # place. We re-synchronized by changing the 316 # index so that it matches again. Now 317 # move on to the next existing task. 318 index += 1 319 old_task = all_tasks[index] if index < len(all_tasks) else None 320 except ValueError: 321 # Not present. Insert before old_task, which 322 # remains the same (but gets shifted back). 323 all_tasks.insert(index, new_task) 324 index += 1 325 bb.debug(3, 'merged task list: %s' % all_tasks) 326 327 # Now reverse the order so that tasks that finish the work on one 328 # recipe are considered more imporant (= come first). The ordering 329 # is now so that do_build is most important. 330 all_tasks.reverse() 331 332 # Group tasks of the same kind before tasks of less important 333 # kinds at the head of the queue (because earlier = lower 334 # priority number = runs earlier), while preserving the 335 # ordering by recipe. If recipe foo is more important than 336 # bar, then the goal is to work on foo's do_populate_sysroot 337 # before bar's do_populate_sysroot and on the more important 338 # tasks of foo before any of the less important tasks in any 339 # other recipe (if those other recipes are more important than 340 # foo). 341 # 342 # All of this only applies when tasks are runable. Explicit 343 # dependencies still override this ordering by priority. 344 # 345 # Here's an example why this priority re-ordering helps with 346 # minimizing disk usage. Consider a recipe foo with a higher 347 # priority than bar where foo DEPENDS on bar. Then the 348 # implicit rule (from base.bbclass) is that foo's do_configure 349 # depends on bar's do_populate_sysroot. This ensures that 350 # bar's do_populate_sysroot gets done first. Normally the 351 # tasks from foo would continue to run once that is done, and 352 # bar only gets completed and cleaned up later. By ordering 353 # bar's task that depend on bar's do_populate_sysroot before foo's 354 # do_configure, that problem gets avoided. 355 task_index = 0 356 self.dump_prio('original priorities') 357 for task in all_tasks: 358 for index in range(task_index, self.numTasks): 359 taskid = self.prio_map[index] 360 taskname = taskid.rsplit(':', 1)[1] 361 if taskname == task: 362 del self.prio_map[index] 363 self.prio_map.insert(task_index, taskid) 364 task_index += 1 365 self.dump_prio('completion priorities') 366 367class RunTaskEntry(object): 368 def __init__(self): 369 self.depends = set() 370 self.revdeps = set() 371 self.hash = None 372 self.unihash = None 373 self.task = None 374 self.weight = 1 375 376class RunQueueData: 377 """ 378 BitBake Run Queue implementation 379 """ 380 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 381 self.cooker = cooker 382 self.dataCaches = dataCaches 383 self.taskData = taskData 384 self.targets = targets 385 self.rq = rq 386 self.warn_multi_bb = False 387 388 self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() 389 self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) 390 self.setscene_ignore_tasks_checked = False 391 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 392 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 393 394 self.reset() 395 396 def reset(self): 397 self.runtaskentries = {} 398 399 def runq_depends_names(self, ids): 400 import re 401 ret = [] 402 for id in ids: 403 nam = os.path.basename(id) 404 nam = re.sub("_[^,]*,", ",", nam) 405 ret.extend([nam]) 406 return ret 407 408 def get_task_hash(self, tid): 409 return self.runtaskentries[tid].hash 410 411 def get_task_unihash(self, tid): 412 return self.runtaskentries[tid].unihash 413 414 def get_user_idstring(self, tid, task_name_suffix = ""): 415 return tid + task_name_suffix 416 417 def get_short_user_idstring(self, task, task_name_suffix = ""): 418 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 419 pn = self.dataCaches[mc].pkg_fn[taskfn] 420 taskname = taskname_from_tid(task) + task_name_suffix 421 return "%s:%s" % (pn, taskname) 422 423 def circular_depchains_handler(self, tasks): 424 """ 425 Some tasks aren't buildable, likely due to circular dependency issues. 426 Identify the circular dependencies and print them in a user readable format. 427 """ 428 from copy import deepcopy 429 430 valid_chains = [] 431 explored_deps = {} 432 msgs = [] 433 434 class TooManyLoops(Exception): 435 pass 436 437 def chain_reorder(chain): 438 """ 439 Reorder a dependency chain so the lowest task id is first 440 """ 441 lowest = 0 442 new_chain = [] 443 for entry in range(len(chain)): 444 if chain[entry] < chain[lowest]: 445 lowest = entry 446 new_chain.extend(chain[lowest:]) 447 new_chain.extend(chain[:lowest]) 448 return new_chain 449 450 def chain_compare_equal(chain1, chain2): 451 """ 452 Compare two dependency chains and see if they're the same 453 """ 454 if len(chain1) != len(chain2): 455 return False 456 for index in range(len(chain1)): 457 if chain1[index] != chain2[index]: 458 return False 459 return True 460 461 def chain_array_contains(chain, chain_array): 462 """ 463 Return True if chain_array contains chain 464 """ 465 for ch in chain_array: 466 if chain_compare_equal(ch, chain): 467 return True 468 return False 469 470 def find_chains(tid, prev_chain): 471 prev_chain.append(tid) 472 total_deps = [] 473 total_deps.extend(self.runtaskentries[tid].revdeps) 474 for revdep in self.runtaskentries[tid].revdeps: 475 if revdep in prev_chain: 476 idx = prev_chain.index(revdep) 477 # To prevent duplicates, reorder the chain to start with the lowest taskid 478 # and search through an array of those we've already printed 479 chain = prev_chain[idx:] 480 new_chain = chain_reorder(chain) 481 if not chain_array_contains(new_chain, valid_chains): 482 valid_chains.append(new_chain) 483 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 484 for dep in new_chain: 485 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 486 msgs.append("\n") 487 if len(valid_chains) > 10: 488 msgs.append("Halted dependency loops search after 10 matches.\n") 489 raise TooManyLoops 490 continue 491 scan = False 492 if revdep not in explored_deps: 493 scan = True 494 elif revdep in explored_deps[revdep]: 495 scan = True 496 else: 497 for dep in prev_chain: 498 if dep in explored_deps[revdep]: 499 scan = True 500 if scan: 501 find_chains(revdep, copy.deepcopy(prev_chain)) 502 for dep in explored_deps[revdep]: 503 if dep not in total_deps: 504 total_deps.append(dep) 505 506 explored_deps[tid] = total_deps 507 508 try: 509 for task in tasks: 510 find_chains(task, []) 511 except TooManyLoops: 512 pass 513 514 return msgs 515 516 def calculate_task_weights(self, endpoints): 517 """ 518 Calculate a number representing the "weight" of each task. Heavier weighted tasks 519 have more dependencies and hence should be executed sooner for maximum speed. 520 521 This function also sanity checks the task list finding tasks that are not 522 possible to execute due to circular dependencies. 523 """ 524 525 numTasks = len(self.runtaskentries) 526 weight = {} 527 deps_left = {} 528 task_done = {} 529 530 for tid in self.runtaskentries: 531 task_done[tid] = False 532 weight[tid] = 1 533 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 534 535 for tid in endpoints: 536 weight[tid] = 10 537 task_done[tid] = True 538 539 while True: 540 next_points = [] 541 for tid in endpoints: 542 for revdep in self.runtaskentries[tid].depends: 543 weight[revdep] = weight[revdep] + weight[tid] 544 deps_left[revdep] = deps_left[revdep] - 1 545 if deps_left[revdep] == 0: 546 next_points.append(revdep) 547 task_done[revdep] = True 548 endpoints = next_points 549 if not next_points: 550 break 551 552 # Circular dependency sanity check 553 problem_tasks = [] 554 for tid in self.runtaskentries: 555 if task_done[tid] is False or deps_left[tid] != 0: 556 problem_tasks.append(tid) 557 logger.debug2("Task %s is not buildable", tid) 558 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 559 self.runtaskentries[tid].weight = weight[tid] 560 561 if problem_tasks: 562 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 563 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 564 message = message + "Identifying dependency loops (this may take a short while)...\n" 565 logger.error(message) 566 567 msgs = self.circular_depchains_handler(problem_tasks) 568 569 message = "\n" 570 for msg in msgs: 571 message = message + msg 572 bb.msg.fatal("RunQueue", message) 573 574 return weight 575 576 def prepare(self): 577 """ 578 Turn a set of taskData into a RunQueue and compute data needed 579 to optimise the execution order. 580 """ 581 582 runq_build = {} 583 recursivetasks = {} 584 recursiveitasks = {} 585 recursivetasksselfref = set() 586 587 taskData = self.taskData 588 589 found = False 590 for mc in self.taskData: 591 if taskData[mc].taskentries: 592 found = True 593 break 594 if not found: 595 # Nothing to do 596 return 0 597 598 self.init_progress_reporter.start() 599 self.init_progress_reporter.next_stage() 600 601 # Step A - Work out a list of tasks to run 602 # 603 # Taskdata gives us a list of possible providers for every build and run 604 # target ordered by priority. It also gives information on each of those 605 # providers. 606 # 607 # To create the actual list of tasks to execute we fix the list of 608 # providers and then resolve the dependencies into task IDs. This 609 # process is repeated for each type of dependency (tdepends, deptask, 610 # rdeptast, recrdeptask, idepends). 611 612 def add_build_dependencies(depids, tasknames, depends, mc): 613 for depname in depids: 614 # Won't be in build_targets if ASSUME_PROVIDED 615 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 616 continue 617 depdata = taskData[mc].build_targets[depname][0] 618 if depdata is None: 619 continue 620 for taskname in tasknames: 621 t = depdata + ":" + taskname 622 if t in taskData[mc].taskentries: 623 depends.add(t) 624 625 def add_runtime_dependencies(depids, tasknames, depends, mc): 626 for depname in depids: 627 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 628 continue 629 depdata = taskData[mc].run_targets[depname][0] 630 if depdata is None: 631 continue 632 for taskname in tasknames: 633 t = depdata + ":" + taskname 634 if t in taskData[mc].taskentries: 635 depends.add(t) 636 637 def add_mc_dependencies(mc, tid): 638 mcdeps = taskData[mc].get_mcdepends() 639 for dep in mcdeps: 640 mcdependency = dep.split(':') 641 pn = mcdependency[3] 642 frommc = mcdependency[1] 643 mcdep = mcdependency[2] 644 deptask = mcdependency[4] 645 if mc == frommc: 646 fn = taskData[mcdep].build_targets[pn][0] 647 newdep = '%s:%s' % (fn,deptask) 648 taskData[mc].taskentries[tid].tdepends.append(newdep) 649 650 for mc in taskData: 651 for tid in taskData[mc].taskentries: 652 653 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 654 #runtid = build_tid(mc, fn, taskname) 655 656 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 657 658 depends = set() 659 task_deps = self.dataCaches[mc].task_deps[taskfn] 660 661 self.runtaskentries[tid] = RunTaskEntry() 662 663 if fn in taskData[mc].failed_fns: 664 continue 665 666 # We add multiconfig dependencies before processing internal task deps (tdepends) 667 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 668 add_mc_dependencies(mc, tid) 669 670 # Resolve task internal dependencies 671 # 672 # e.g. addtask before X after Y 673 for t in taskData[mc].taskentries[tid].tdepends: 674 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 675 depends.add(build_tid(depmc, depfn, deptaskname)) 676 677 # Resolve 'deptask' dependencies 678 # 679 # e.g. do_sometask[deptask] = "do_someothertask" 680 # (makes sure sometask runs after someothertask of all DEPENDS) 681 if 'deptask' in task_deps and taskname in task_deps['deptask']: 682 tasknames = task_deps['deptask'][taskname].split() 683 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 684 685 # Resolve 'rdeptask' dependencies 686 # 687 # e.g. do_sometask[rdeptask] = "do_someothertask" 688 # (makes sure sometask runs after someothertask of all RDEPENDS) 689 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 690 tasknames = task_deps['rdeptask'][taskname].split() 691 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 692 693 # Resolve inter-task dependencies 694 # 695 # e.g. do_sometask[depends] = "targetname:do_someothertask" 696 # (makes sure sometask runs after targetname's someothertask) 697 idepends = taskData[mc].taskentries[tid].idepends 698 for (depname, idependtask) in idepends: 699 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 700 # Won't be in build_targets if ASSUME_PROVIDED 701 depdata = taskData[mc].build_targets[depname][0] 702 if depdata is not None: 703 t = depdata + ":" + idependtask 704 depends.add(t) 705 if t not in taskData[mc].taskentries: 706 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 707 irdepends = taskData[mc].taskentries[tid].irdepends 708 for (depname, idependtask) in irdepends: 709 if depname in taskData[mc].run_targets: 710 # Won't be in run_targets if ASSUME_PROVIDED 711 if not taskData[mc].run_targets[depname]: 712 continue 713 depdata = taskData[mc].run_targets[depname][0] 714 if depdata is not None: 715 t = depdata + ":" + idependtask 716 depends.add(t) 717 if t not in taskData[mc].taskentries: 718 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 719 720 # Resolve recursive 'recrdeptask' dependencies (Part A) 721 # 722 # e.g. do_sometask[recrdeptask] = "do_someothertask" 723 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 724 # We cover the recursive part of the dependencies below 725 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 726 tasknames = task_deps['recrdeptask'][taskname].split() 727 recursivetasks[tid] = tasknames 728 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 729 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 730 if taskname in tasknames: 731 recursivetasksselfref.add(tid) 732 733 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 734 recursiveitasks[tid] = [] 735 for t in task_deps['recideptask'][taskname].split(): 736 newdep = build_tid(mc, fn, t) 737 recursiveitasks[tid].append(newdep) 738 739 self.runtaskentries[tid].depends = depends 740 # Remove all self references 741 self.runtaskentries[tid].depends.discard(tid) 742 743 #self.dump_data() 744 745 self.init_progress_reporter.next_stage() 746 747 # Resolve recursive 'recrdeptask' dependencies (Part B) 748 # 749 # e.g. do_sometask[recrdeptask] = "do_someothertask" 750 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 751 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 752 753 # Generating/interating recursive lists of dependencies is painful and potentially slow 754 # Precompute recursive task dependencies here by: 755 # a) create a temp list of reverse dependencies (revdeps) 756 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 757 # c) combine the total list of dependencies in cumulativedeps 758 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 759 760 761 revdeps = {} 762 deps = {} 763 cumulativedeps = {} 764 for tid in self.runtaskentries: 765 deps[tid] = set(self.runtaskentries[tid].depends) 766 revdeps[tid] = set() 767 cumulativedeps[tid] = set() 768 # Generate a temp list of reverse dependencies 769 for tid in self.runtaskentries: 770 for dep in self.runtaskentries[tid].depends: 771 revdeps[dep].add(tid) 772 # Find the dependency chain endpoints 773 endpoints = set() 774 for tid in self.runtaskentries: 775 if not deps[tid]: 776 endpoints.add(tid) 777 # Iterate the chains collating dependencies 778 while endpoints: 779 next = set() 780 for tid in endpoints: 781 for dep in revdeps[tid]: 782 cumulativedeps[dep].add(fn_from_tid(tid)) 783 cumulativedeps[dep].update(cumulativedeps[tid]) 784 if tid in deps[dep]: 785 deps[dep].remove(tid) 786 if not deps[dep]: 787 next.add(dep) 788 endpoints = next 789 #for tid in deps: 790 # if deps[tid]: 791 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 792 793 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 794 # resolve these recursively until we aren't adding any further extra dependencies 795 extradeps = True 796 while extradeps: 797 extradeps = 0 798 for tid in recursivetasks: 799 tasknames = recursivetasks[tid] 800 801 totaldeps = set(self.runtaskentries[tid].depends) 802 if tid in recursiveitasks: 803 totaldeps.update(recursiveitasks[tid]) 804 for dep in recursiveitasks[tid]: 805 if dep not in self.runtaskentries: 806 continue 807 totaldeps.update(self.runtaskentries[dep].depends) 808 809 deps = set() 810 for dep in totaldeps: 811 if dep in cumulativedeps: 812 deps.update(cumulativedeps[dep]) 813 814 for t in deps: 815 for taskname in tasknames: 816 newtid = t + ":" + taskname 817 if newtid == tid: 818 continue 819 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 820 extradeps += 1 821 self.runtaskentries[tid].depends.add(newtid) 822 823 # Handle recursive tasks which depend upon other recursive tasks 824 deps = set() 825 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 826 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 827 for newtid in deps: 828 for taskname in tasknames: 829 if not newtid.endswith(":" + taskname): 830 continue 831 if newtid in self.runtaskentries: 832 extradeps += 1 833 self.runtaskentries[tid].depends.add(newtid) 834 835 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 836 837 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 838 for tid in recursivetasksselfref: 839 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 840 841 self.init_progress_reporter.next_stage() 842 843 #self.dump_data() 844 845 # Step B - Mark all active tasks 846 # 847 # Start with the tasks we were asked to run and mark all dependencies 848 # as active too. If the task is to be 'forced', clear its stamp. Once 849 # all active tasks are marked, prune the ones we don't need. 850 851 logger.verbose("Marking Active Tasks") 852 853 def mark_active(tid, depth): 854 """ 855 Mark an item as active along with its depends 856 (calls itself recursively) 857 """ 858 859 if tid in runq_build: 860 return 861 862 runq_build[tid] = 1 863 864 depends = self.runtaskentries[tid].depends 865 for depend in depends: 866 mark_active(depend, depth+1) 867 868 def invalidate_task(tid, error_nostamp): 869 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 870 taskdep = self.dataCaches[mc].task_deps[taskfn] 871 if fn + ":" + taskname not in taskData[mc].taskentries: 872 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 873 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 874 if error_nostamp: 875 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 876 else: 877 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 878 else: 879 logger.verbose("Invalidate task %s, %s", taskname, fn) 880 bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn) 881 882 self.target_tids = [] 883 for (mc, target, task, fn) in self.targets: 884 885 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 886 continue 887 888 if target in taskData[mc].failed_deps: 889 continue 890 891 parents = False 892 if task.endswith('-'): 893 parents = True 894 task = task[:-1] 895 896 if fn in taskData[mc].failed_fns: 897 continue 898 899 # fn already has mc prefix 900 tid = fn + ":" + task 901 self.target_tids.append(tid) 902 if tid not in taskData[mc].taskentries: 903 import difflib 904 tasks = [] 905 for x in taskData[mc].taskentries: 906 if x.startswith(fn + ":"): 907 tasks.append(taskname_from_tid(x)) 908 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 909 if close_matches: 910 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 911 else: 912 extra = "" 913 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 914 915 # For tasks called "XXXX-", ony run their dependencies 916 if parents: 917 for i in self.runtaskentries[tid].depends: 918 mark_active(i, 1) 919 else: 920 mark_active(tid, 1) 921 922 self.init_progress_reporter.next_stage() 923 924 # Step C - Prune all inactive tasks 925 # 926 # Once all active tasks are marked, prune the ones we don't need. 927 928 # Handle --runall 929 if self.cooker.configuration.runall: 930 # re-run the mark_active and then drop unused tasks from new list 931 reduced_tasklist = set(self.runtaskentries.keys()) 932 for tid in list(self.runtaskentries.keys()): 933 if tid not in runq_build: 934 reduced_tasklist.remove(tid) 935 runq_build = {} 936 937 for task in self.cooker.configuration.runall: 938 if not task.startswith("do_"): 939 task = "do_{0}".format(task) 940 runall_tids = set() 941 for tid in reduced_tasklist: 942 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 943 if wanttid in self.runtaskentries: 944 runall_tids.add(wanttid) 945 946 for tid in list(runall_tids): 947 mark_active(tid, 1) 948 if self.cooker.configuration.force: 949 invalidate_task(tid, False) 950 951 delcount = set() 952 for tid in list(self.runtaskentries.keys()): 953 if tid not in runq_build: 954 delcount.add(tid) 955 del self.runtaskentries[tid] 956 957 if self.cooker.configuration.runall: 958 if not self.runtaskentries: 959 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 960 961 self.init_progress_reporter.next_stage() 962 963 # Handle runonly 964 if self.cooker.configuration.runonly: 965 # re-run the mark_active and then drop unused tasks from new list 966 runq_build = {} 967 968 for task in self.cooker.configuration.runonly: 969 if not task.startswith("do_"): 970 task = "do_{0}".format(task) 971 runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] 972 973 for tid in runonly_tids: 974 mark_active(tid, 1) 975 if self.cooker.configuration.force: 976 invalidate_task(tid, False) 977 978 for tid in list(self.runtaskentries.keys()): 979 if tid not in runq_build: 980 delcount.add(tid) 981 del self.runtaskentries[tid] 982 983 if not self.runtaskentries: 984 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 985 986 # 987 # Step D - Sanity checks and computation 988 # 989 990 # Check to make sure we still have tasks to run 991 if not self.runtaskentries: 992 if not taskData[''].halt: 993 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 994 else: 995 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 996 997 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 998 999 logger.verbose("Assign Weightings") 1000 1001 self.init_progress_reporter.next_stage() 1002 1003 # Generate a list of reverse dependencies to ease future calculations 1004 for tid in self.runtaskentries: 1005 for dep in self.runtaskentries[tid].depends: 1006 self.runtaskentries[dep].revdeps.add(tid) 1007 1008 self.init_progress_reporter.next_stage() 1009 1010 # Identify tasks at the end of dependency chains 1011 # Error on circular dependency loops (length two) 1012 endpoints = [] 1013 for tid in self.runtaskentries: 1014 revdeps = self.runtaskentries[tid].revdeps 1015 if not revdeps: 1016 endpoints.append(tid) 1017 for dep in revdeps: 1018 if dep in self.runtaskentries[tid].depends: 1019 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1020 1021 1022 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1023 1024 self.init_progress_reporter.next_stage() 1025 1026 # Calculate task weights 1027 # Check of higher length circular dependencies 1028 self.runq_weight = self.calculate_task_weights(endpoints) 1029 1030 self.init_progress_reporter.next_stage() 1031 1032 # Sanity Check - Check for multiple tasks building the same provider 1033 for mc in self.dataCaches: 1034 prov_list = {} 1035 seen_fn = [] 1036 for tid in self.runtaskentries: 1037 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1038 if taskfn in seen_fn: 1039 continue 1040 if mc != tidmc: 1041 continue 1042 seen_fn.append(taskfn) 1043 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1044 if prov not in prov_list: 1045 prov_list[prov] = [taskfn] 1046 elif taskfn not in prov_list[prov]: 1047 prov_list[prov].append(taskfn) 1048 for prov in prov_list: 1049 if len(prov_list[prov]) < 2: 1050 continue 1051 if prov in self.multi_provider_allowed: 1052 continue 1053 seen_pn = [] 1054 # If two versions of the same PN are being built its fatal, we don't support it. 1055 for fn in prov_list[prov]: 1056 pn = self.dataCaches[mc].pkg_fn[fn] 1057 if pn not in seen_pn: 1058 seen_pn.append(pn) 1059 else: 1060 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1061 msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] 1062 # 1063 # Construct a list of things which uniquely depend on each provider 1064 # since this may help the user figure out which dependency is triggering this warning 1065 # 1066 msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") 1067 deplist = {} 1068 commondeps = None 1069 for provfn in prov_list[prov]: 1070 deps = set() 1071 for tid in self.runtaskentries: 1072 fn = fn_from_tid(tid) 1073 if fn != provfn: 1074 continue 1075 for dep in self.runtaskentries[tid].revdeps: 1076 fn = fn_from_tid(dep) 1077 if fn == provfn: 1078 continue 1079 deps.add(dep) 1080 if not commondeps: 1081 commondeps = set(deps) 1082 else: 1083 commondeps &= deps 1084 deplist[provfn] = deps 1085 for provfn in deplist: 1086 msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) 1087 # 1088 # Construct a list of provides and runtime providers for each recipe 1089 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1090 # 1091 msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") 1092 provide_results = {} 1093 rprovide_results = {} 1094 commonprovs = None 1095 commonrprovs = None 1096 for provfn in prov_list[prov]: 1097 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1098 rprovides = set() 1099 for rprovide in self.dataCaches[mc].rproviders: 1100 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1101 rprovides.add(rprovide) 1102 for package in self.dataCaches[mc].packages: 1103 if provfn in self.dataCaches[mc].packages[package]: 1104 rprovides.add(package) 1105 for package in self.dataCaches[mc].packages_dynamic: 1106 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1107 rprovides.add(package) 1108 if not commonprovs: 1109 commonprovs = set(provides) 1110 else: 1111 commonprovs &= provides 1112 provide_results[provfn] = provides 1113 if not commonrprovs: 1114 commonrprovs = set(rprovides) 1115 else: 1116 commonrprovs &= rprovides 1117 rprovide_results[provfn] = rprovides 1118 #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) 1119 #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) 1120 for provfn in prov_list[prov]: 1121 msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) 1122 msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) 1123 1124 if self.warn_multi_bb: 1125 logger.verbnote("".join(msgs)) 1126 else: 1127 logger.error("".join(msgs)) 1128 1129 self.init_progress_reporter.next_stage() 1130 self.init_progress_reporter.next_stage() 1131 1132 # Iterate over the task list looking for tasks with a 'setscene' function 1133 self.runq_setscene_tids = set() 1134 if not self.cooker.configuration.nosetscene: 1135 for tid in self.runtaskentries: 1136 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1137 setscenetid = tid + "_setscene" 1138 if setscenetid not in taskData[mc].taskentries: 1139 continue 1140 self.runq_setscene_tids.add(tid) 1141 1142 self.init_progress_reporter.next_stage() 1143 1144 # Invalidate task if force mode active 1145 if self.cooker.configuration.force: 1146 for tid in self.target_tids: 1147 invalidate_task(tid, False) 1148 1149 # Invalidate task if invalidate mode active 1150 if self.cooker.configuration.invalidate_stamp: 1151 for tid in self.target_tids: 1152 fn = fn_from_tid(tid) 1153 for st in self.cooker.configuration.invalidate_stamp.split(','): 1154 if not st.startswith("do_"): 1155 st = "do_%s" % st 1156 invalidate_task(fn + ":" + st, True) 1157 1158 self.init_progress_reporter.next_stage() 1159 1160 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1161 for mc in taskData: 1162 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1163 virtpnmap = {} 1164 for v in virtmap: 1165 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1166 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1167 if hasattr(bb.parse.siggen, "tasks_resolved"): 1168 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1169 1170 self.init_progress_reporter.next_stage() 1171 1172 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1173 1174 # Iterate over the task list and call into the siggen code 1175 dealtwith = set() 1176 todeal = set(self.runtaskentries) 1177 while todeal: 1178 for tid in todeal.copy(): 1179 if not (self.runtaskentries[tid].depends - dealtwith): 1180 dealtwith.add(tid) 1181 todeal.remove(tid) 1182 self.prepare_task_hash(tid) 1183 1184 bb.parse.siggen.writeout_file_checksum_cache() 1185 1186 #self.dump_data() 1187 return len(self.runtaskentries) 1188 1189 def prepare_task_hash(self, tid): 1190 dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid)) 1191 bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc) 1192 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc) 1193 self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 1194 1195 def dump_data(self): 1196 """ 1197 Dump some debug information on the internal data structures 1198 """ 1199 logger.debug3("run_tasks:") 1200 for tid in self.runtaskentries: 1201 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1202 self.runtaskentries[tid].weight, 1203 self.runtaskentries[tid].depends, 1204 self.runtaskentries[tid].revdeps) 1205 1206class RunQueueWorker(): 1207 def __init__(self, process, pipe): 1208 self.process = process 1209 self.pipe = pipe 1210 1211class RunQueue: 1212 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1213 1214 self.cooker = cooker 1215 self.cfgData = cfgData 1216 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1217 1218 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1219 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1220 1221 self.state = runQueuePrepare 1222 1223 # For disk space monitor 1224 # Invoked at regular time intervals via the bitbake heartbeat event 1225 # while the build is running. We generate a unique name for the handler 1226 # here, just in case that there ever is more than one RunQueue instance, 1227 # start the handler when reaching runQueueSceneInit, and stop it when 1228 # done with the build. 1229 self.dm = monitordisk.diskMonitor(cfgData) 1230 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1231 self.dm_event_handler_registered = False 1232 self.rqexe = None 1233 self.worker = {} 1234 self.fakeworker = {} 1235 1236 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1237 logger.debug("Starting bitbake-worker") 1238 magic = "decafbad" 1239 if self.cooker.configuration.profile: 1240 magic = "decafbadbad" 1241 fakerootlogs = None 1242 if fakeroot: 1243 magic = magic + "beef" 1244 mcdata = self.cooker.databuilder.mcdata[mc] 1245 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1246 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1247 env = os.environ.copy() 1248 for key, value in (var.split('=') for var in fakerootenv): 1249 env[key] = value 1250 worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1251 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1252 else: 1253 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1254 bb.utils.nonblockingfd(worker.stdout) 1255 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1256 1257 workerdata = { 1258 "taskdeps" : self.rqdata.dataCaches[mc].task_deps, 1259 "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv, 1260 "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs, 1261 "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, 1262 "sigdata" : bb.parse.siggen.get_taskdata(), 1263 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1264 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1265 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1266 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1267 "prhost" : self.cooker.prhost, 1268 "buildname" : self.cfgData.getVar("BUILDNAME"), 1269 "date" : self.cfgData.getVar("DATE"), 1270 "time" : self.cfgData.getVar("TIME"), 1271 "hashservaddr" : self.cooker.hashservaddr, 1272 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1273 } 1274 1275 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") 1276 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") 1277 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 1278 worker.stdin.flush() 1279 1280 return RunQueueWorker(worker, workerpipe) 1281 1282 def _teardown_worker(self, worker): 1283 if not worker: 1284 return 1285 logger.debug("Teardown for bitbake-worker") 1286 try: 1287 worker.process.stdin.write(b"<quit></quit>") 1288 worker.process.stdin.flush() 1289 worker.process.stdin.close() 1290 except IOError: 1291 pass 1292 while worker.process.returncode is None: 1293 worker.pipe.read() 1294 worker.process.poll() 1295 while worker.pipe.read(): 1296 continue 1297 worker.pipe.close() 1298 1299 def start_worker(self): 1300 if self.worker: 1301 self.teardown_workers() 1302 self.teardown = False 1303 for mc in self.rqdata.dataCaches: 1304 self.worker[mc] = self._start_worker(mc) 1305 1306 def start_fakeworker(self, rqexec, mc): 1307 if not mc in self.fakeworker: 1308 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1309 1310 def teardown_workers(self): 1311 self.teardown = True 1312 for mc in self.worker: 1313 self._teardown_worker(self.worker[mc]) 1314 self.worker = {} 1315 for mc in self.fakeworker: 1316 self._teardown_worker(self.fakeworker[mc]) 1317 self.fakeworker = {} 1318 1319 def read_workers(self): 1320 for mc in self.worker: 1321 self.worker[mc].pipe.read() 1322 for mc in self.fakeworker: 1323 self.fakeworker[mc].pipe.read() 1324 1325 def active_fds(self): 1326 fds = [] 1327 for mc in self.worker: 1328 fds.append(self.worker[mc].pipe.input) 1329 for mc in self.fakeworker: 1330 fds.append(self.fakeworker[mc].pipe.input) 1331 return fds 1332 1333 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1334 def get_timestamp(f): 1335 try: 1336 if not os.access(f, os.F_OK): 1337 return None 1338 return os.stat(f)[stat.ST_MTIME] 1339 except: 1340 return None 1341 1342 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1343 if taskname is None: 1344 taskname = tn 1345 1346 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn) 1347 1348 # If the stamp is missing, it's not current 1349 if not os.access(stampfile, os.F_OK): 1350 logger.debug2("Stampfile %s not available", stampfile) 1351 return False 1352 # If it's a 'nostamp' task, it's not current 1353 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1354 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1355 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1356 return False 1357 1358 if taskname != "do_setscene" and taskname.endswith("_setscene"): 1359 return True 1360 1361 if cache is None: 1362 cache = {} 1363 1364 iscurrent = True 1365 t1 = get_timestamp(stampfile) 1366 for dep in self.rqdata.runtaskentries[tid].depends: 1367 if iscurrent: 1368 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1369 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2) 1370 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2) 1371 t2 = get_timestamp(stampfile2) 1372 t3 = get_timestamp(stampfile3) 1373 if t3 and not t2: 1374 continue 1375 if t3 and t3 > t2: 1376 continue 1377 if fn == fn2: 1378 if not t2: 1379 logger.debug2('Stampfile %s does not exist', stampfile2) 1380 iscurrent = False 1381 break 1382 if t1 < t2: 1383 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1384 iscurrent = False 1385 break 1386 if recurse and iscurrent: 1387 if dep in cache: 1388 iscurrent = cache[dep] 1389 if not iscurrent: 1390 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1391 else: 1392 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1393 cache[dep] = iscurrent 1394 if recurse: 1395 cache[tid] = iscurrent 1396 return iscurrent 1397 1398 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1399 valid = set() 1400 if self.hashvalidate: 1401 sq_data = {} 1402 sq_data['hash'] = {} 1403 sq_data['hashfn'] = {} 1404 sq_data['unihash'] = {} 1405 for tid in tocheck: 1406 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1407 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1408 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1409 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1410 1411 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1412 1413 return valid 1414 1415 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1416 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1417 1418 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1419 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1420 1421 return bb.utils.better_eval(call, locs) 1422 1423 def _execute_runqueue(self): 1424 """ 1425 Run the tasks in a queue prepared by rqdata.prepare() 1426 Upon failure, optionally try to recover the build using any alternate providers 1427 (if the halt on failure configuration option isn't set) 1428 """ 1429 1430 retval = True 1431 1432 if self.state is runQueuePrepare: 1433 # NOTE: if you add, remove or significantly refactor the stages of this 1434 # process then you should recalculate the weightings here. This is quite 1435 # easy to do - just change the next line temporarily to pass debug=True as 1436 # the last parameter and you'll get a printout of the weightings as well 1437 # as a map to the lines where next_stage() was called. Of course this isn't 1438 # critical, but it helps to keep the progress reporting accurate. 1439 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1440 "Initialising tasks", 1441 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1442 if self.rqdata.prepare() == 0: 1443 self.state = runQueueComplete 1444 else: 1445 self.state = runQueueSceneInit 1446 bb.parse.siggen.save_unitaskhashes() 1447 1448 if self.state is runQueueSceneInit: 1449 self.rqdata.init_progress_reporter.next_stage() 1450 1451 # we are ready to run, emit dependency info to any UI or class which 1452 # needs it 1453 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1454 self.rqdata.init_progress_reporter.next_stage() 1455 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1456 1457 if not self.dm_event_handler_registered: 1458 res = bb.event.register(self.dm_event_handler_name, 1459 lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1460 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1461 self.dm_event_handler_registered = True 1462 1463 dump = self.cooker.configuration.dump_signatures 1464 if dump: 1465 self.rqdata.init_progress_reporter.finish() 1466 if 'printdiff' in dump: 1467 invalidtasks = self.print_diffscenetasks() 1468 self.dump_signatures(dump) 1469 if 'printdiff' in dump: 1470 self.write_diffscenetasks(invalidtasks) 1471 self.state = runQueueComplete 1472 1473 if self.state is runQueueSceneInit: 1474 self.rqdata.init_progress_reporter.next_stage() 1475 self.start_worker() 1476 self.rqdata.init_progress_reporter.next_stage() 1477 self.rqexe = RunQueueExecute(self) 1478 1479 # If we don't have any setscene functions, skip execution 1480 if not self.rqdata.runq_setscene_tids: 1481 logger.info('No setscene tasks') 1482 for tid in self.rqdata.runtaskentries: 1483 if not self.rqdata.runtaskentries[tid].depends: 1484 self.rqexe.setbuildable(tid) 1485 self.rqexe.tasks_notcovered.add(tid) 1486 self.rqexe.sqdone = True 1487 logger.info('Executing Tasks') 1488 self.state = runQueueRunning 1489 1490 if self.state is runQueueRunning: 1491 retval = self.rqexe.execute() 1492 1493 if self.state is runQueueCleanUp: 1494 retval = self.rqexe.finish() 1495 1496 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1497 1498 if build_done and self.dm_event_handler_registered: 1499 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1500 self.dm_event_handler_registered = False 1501 1502 if build_done and self.rqexe: 1503 bb.parse.siggen.save_unitaskhashes() 1504 self.teardown_workers() 1505 if self.rqexe: 1506 if self.rqexe.stats.failed: 1507 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1508 else: 1509 # Let's avoid the word "failed" if nothing actually did 1510 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1511 1512 if self.state is runQueueFailed: 1513 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1514 1515 if self.state is runQueueComplete: 1516 # All done 1517 return False 1518 1519 # Loop 1520 return retval 1521 1522 def execute_runqueue(self): 1523 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1524 try: 1525 return self._execute_runqueue() 1526 except bb.runqueue.TaskFailure: 1527 raise 1528 except SystemExit: 1529 raise 1530 except bb.BBHandledException: 1531 try: 1532 self.teardown_workers() 1533 except: 1534 pass 1535 self.state = runQueueComplete 1536 raise 1537 except Exception as err: 1538 logger.exception("An uncaught exception occurred in runqueue") 1539 try: 1540 self.teardown_workers() 1541 except: 1542 pass 1543 self.state = runQueueComplete 1544 raise 1545 1546 def finish_runqueue(self, now = False): 1547 if not self.rqexe: 1548 self.state = runQueueComplete 1549 return 1550 1551 if now: 1552 self.rqexe.finish_now() 1553 else: 1554 self.rqexe.finish() 1555 1556 def rq_dump_sigfn(self, fn, options): 1557 bb_cache = bb.cache.NoCache(self.cooker.databuilder) 1558 mc = bb.runqueue.mc_from_tid(fn) 1559 the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn)) 1560 siggen = bb.parse.siggen 1561 dataCaches = self.rqdata.dataCaches 1562 siggen.dump_sigfn(fn, dataCaches, options) 1563 1564 def dump_signatures(self, options): 1565 fns = set() 1566 bb.note("Reparsing files to collect dependency data") 1567 1568 for tid in self.rqdata.runtaskentries: 1569 fn = fn_from_tid(tid) 1570 fns.add(fn) 1571 1572 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1573 # We cannot use the real multiprocessing.Pool easily due to some local data 1574 # that can't be pickled. This is a cheap multi-process solution. 1575 launched = [] 1576 while fns: 1577 if len(launched) < max_process: 1578 p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options)) 1579 p.start() 1580 launched.append(p) 1581 for q in launched: 1582 # The finished processes are joined when calling is_alive() 1583 if not q.is_alive(): 1584 launched.remove(q) 1585 for p in launched: 1586 p.join() 1587 1588 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1589 1590 return 1591 1592 def print_diffscenetasks(self): 1593 1594 noexec = [] 1595 tocheck = set() 1596 1597 for tid in self.rqdata.runtaskentries: 1598 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1599 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1600 1601 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1602 noexec.append(tid) 1603 continue 1604 1605 tocheck.add(tid) 1606 1607 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1608 1609 # Tasks which are both setscene and noexec never care about dependencies 1610 # We therefore find tasks which are setscene and noexec and mark their 1611 # unique dependencies as valid. 1612 for tid in noexec: 1613 if tid not in self.rqdata.runq_setscene_tids: 1614 continue 1615 for dep in self.rqdata.runtaskentries[tid].depends: 1616 hasnoexecparents = True 1617 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1618 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1619 continue 1620 hasnoexecparents = False 1621 break 1622 if hasnoexecparents: 1623 valid_new.add(dep) 1624 1625 invalidtasks = set() 1626 for tid in self.rqdata.runtaskentries: 1627 if tid not in valid_new and tid not in noexec: 1628 invalidtasks.add(tid) 1629 1630 found = set() 1631 processed = set() 1632 for tid in invalidtasks: 1633 toprocess = set([tid]) 1634 while toprocess: 1635 next = set() 1636 for t in toprocess: 1637 for dep in self.rqdata.runtaskentries[t].depends: 1638 if dep in invalidtasks: 1639 found.add(tid) 1640 if dep not in processed: 1641 processed.add(dep) 1642 next.add(dep) 1643 toprocess = next 1644 if tid in found: 1645 toprocess = set() 1646 1647 tasklist = [] 1648 for tid in invalidtasks.difference(found): 1649 tasklist.append(tid) 1650 1651 if tasklist: 1652 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1653 1654 return invalidtasks.difference(found) 1655 1656 def write_diffscenetasks(self, invalidtasks): 1657 1658 # Define recursion callback 1659 def recursecb(key, hash1, hash2): 1660 hashes = [hash1, hash2] 1661 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1662 1663 recout = [] 1664 if len(hashfiles) == 2: 1665 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) 1666 recout.extend(list(' ' + l for l in out2)) 1667 else: 1668 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1669 1670 return recout 1671 1672 1673 for tid in invalidtasks: 1674 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1675 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1676 h = self.rqdata.runtaskentries[tid].hash 1677 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) 1678 match = None 1679 for m in matches: 1680 if h in m: 1681 match = m 1682 if match is None: 1683 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) 1684 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1685 if matches: 1686 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] 1687 prevh = __find_sha256__.search(latestmatch).group(0) 1688 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1689 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1690 1691 1692class RunQueueExecute: 1693 1694 def __init__(self, rq): 1695 self.rq = rq 1696 self.cooker = rq.cooker 1697 self.cfgData = rq.cfgData 1698 self.rqdata = rq.rqdata 1699 1700 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1701 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1702 1703 self.sq_buildable = set() 1704 self.sq_running = set() 1705 self.sq_live = set() 1706 1707 self.updated_taskhash_queue = [] 1708 self.pending_migrations = set() 1709 1710 self.runq_buildable = set() 1711 self.runq_running = set() 1712 self.runq_complete = set() 1713 self.runq_tasksrun = set() 1714 1715 self.build_stamps = {} 1716 self.build_stamps2 = [] 1717 self.failed_tids = [] 1718 self.sq_deferred = {} 1719 1720 self.stampcache = {} 1721 1722 self.holdoff_tasks = set() 1723 self.holdoff_need_update = True 1724 self.sqdone = False 1725 1726 self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) 1727 1728 for mc in rq.worker: 1729 rq.worker[mc].pipe.setrunqueueexec(self) 1730 for mc in rq.fakeworker: 1731 rq.fakeworker[mc].pipe.setrunqueueexec(self) 1732 1733 if self.number_tasks <= 0: 1734 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1735 1736 # List of setscene tasks which we've covered 1737 self.scenequeue_covered = set() 1738 # List of tasks which are covered (including setscene ones) 1739 self.tasks_covered = set() 1740 self.tasks_scenequeue_done = set() 1741 self.scenequeue_notcovered = set() 1742 self.tasks_notcovered = set() 1743 self.scenequeue_notneeded = set() 1744 1745 # We can't skip specified target tasks which aren't setscene tasks 1746 self.cantskip = set(self.rqdata.target_tids) 1747 self.cantskip.difference_update(self.rqdata.runq_setscene_tids) 1748 self.cantskip.intersection_update(self.rqdata.runtaskentries) 1749 1750 schedulers = self.get_schedulers() 1751 for scheduler in schedulers: 1752 if self.scheduler == scheduler.name: 1753 self.sched = scheduler(self, self.rqdata) 1754 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1755 break 1756 else: 1757 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1758 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1759 1760 #if self.rqdata.runq_setscene_tids: 1761 self.sqdata = SQData() 1762 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) 1763 1764 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1765 1766 # self.build_stamps[pid] may not exist when use shared work directory. 1767 if task in self.build_stamps: 1768 self.build_stamps2.remove(self.build_stamps[task]) 1769 del self.build_stamps[task] 1770 1771 if task in self.sq_live: 1772 if status != 0: 1773 self.sq_task_fail(task, status) 1774 else: 1775 self.sq_task_complete(task) 1776 self.sq_live.remove(task) 1777 self.stats.updateActiveSetscene(len(self.sq_live)) 1778 else: 1779 if status != 0: 1780 self.task_fail(task, status, fakerootlog=fakerootlog) 1781 else: 1782 self.task_complete(task) 1783 return True 1784 1785 def finish_now(self): 1786 for mc in self.rq.worker: 1787 try: 1788 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") 1789 self.rq.worker[mc].process.stdin.flush() 1790 except IOError: 1791 # worker must have died? 1792 pass 1793 for mc in self.rq.fakeworker: 1794 try: 1795 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") 1796 self.rq.fakeworker[mc].process.stdin.flush() 1797 except IOError: 1798 # worker must have died? 1799 pass 1800 1801 if self.failed_tids: 1802 self.rq.state = runQueueFailed 1803 return 1804 1805 self.rq.state = runQueueComplete 1806 return 1807 1808 def finish(self): 1809 self.rq.state = runQueueCleanUp 1810 1811 active = self.stats.active + len(self.sq_live) 1812 if active > 0: 1813 bb.event.fire(runQueueExitWait(active), self.cfgData) 1814 self.rq.read_workers() 1815 return self.rq.active_fds() 1816 1817 if self.failed_tids: 1818 self.rq.state = runQueueFailed 1819 return True 1820 1821 self.rq.state = runQueueComplete 1822 return True 1823 1824 # Used by setscene only 1825 def check_dependencies(self, task, taskdeps): 1826 if not self.rq.depvalidate: 1827 return False 1828 1829 # Must not edit parent data 1830 taskdeps = set(taskdeps) 1831 1832 taskdata = {} 1833 taskdeps.add(task) 1834 for dep in taskdeps: 1835 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 1836 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1837 taskdata[dep] = [pn, taskname, fn] 1838 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 1839 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 1840 valid = bb.utils.better_eval(call, locs) 1841 return valid 1842 1843 def can_start_task(self): 1844 active = self.stats.active + len(self.sq_live) 1845 can_start = active < self.number_tasks 1846 return can_start 1847 1848 def get_schedulers(self): 1849 schedulers = set(obj for obj in globals().values() 1850 if type(obj) is type and 1851 issubclass(obj, RunQueueScheduler)) 1852 1853 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 1854 if user_schedulers: 1855 for sched in user_schedulers.split(): 1856 if not "." in sched: 1857 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 1858 continue 1859 1860 modname, name = sched.rsplit(".", 1) 1861 try: 1862 module = __import__(modname, fromlist=(name,)) 1863 except ImportError as exc: 1864 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 1865 raise SystemExit(1) 1866 else: 1867 schedulers.add(getattr(module, name)) 1868 return schedulers 1869 1870 def setbuildable(self, task): 1871 self.runq_buildable.add(task) 1872 self.sched.newbuildable(task) 1873 1874 def task_completeoutright(self, task): 1875 """ 1876 Mark a task as completed 1877 Look at the reverse dependencies and mark any task with 1878 completed dependencies as buildable 1879 """ 1880 self.runq_complete.add(task) 1881 for revdep in self.rqdata.runtaskentries[task].revdeps: 1882 if revdep in self.runq_running: 1883 continue 1884 if revdep in self.runq_buildable: 1885 continue 1886 alldeps = True 1887 for dep in self.rqdata.runtaskentries[revdep].depends: 1888 if dep not in self.runq_complete: 1889 alldeps = False 1890 break 1891 if alldeps: 1892 self.setbuildable(revdep) 1893 logger.debug("Marking task %s as buildable", revdep) 1894 1895 for t in self.sq_deferred.copy(): 1896 if self.sq_deferred[t] == task: 1897 logger.debug2("Deferred task %s now buildable" % t) 1898 del self.sq_deferred[t] 1899 update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 1900 1901 def task_complete(self, task): 1902 self.stats.taskCompleted() 1903 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 1904 self.task_completeoutright(task) 1905 self.runq_tasksrun.add(task) 1906 1907 def task_fail(self, task, exitcode, fakerootlog=None): 1908 """ 1909 Called when a task has failed 1910 Updates the state engine with the failure 1911 """ 1912 self.stats.taskFailed() 1913 self.failed_tids.append(task) 1914 1915 fakeroot_log = [] 1916 if fakerootlog and os.path.exists(fakerootlog): 1917 with open(fakerootlog) as fakeroot_log_file: 1918 fakeroot_failed = False 1919 for line in reversed(fakeroot_log_file.readlines()): 1920 for fakeroot_error in ['mismatch', 'error', 'fatal']: 1921 if fakeroot_error in line.lower(): 1922 fakeroot_failed = True 1923 if 'doing new pid setup and server start' in line: 1924 break 1925 fakeroot_log.append(line) 1926 1927 if not fakeroot_failed: 1928 fakeroot_log = [] 1929 1930 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) 1931 1932 if self.rqdata.taskData[''].halt: 1933 self.rq.state = runQueueCleanUp 1934 1935 def task_skip(self, task, reason): 1936 self.runq_running.add(task) 1937 self.setbuildable(task) 1938 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 1939 self.task_completeoutright(task) 1940 self.stats.taskSkipped() 1941 self.stats.taskCompleted() 1942 1943 def summarise_scenequeue_errors(self): 1944 err = False 1945 if not self.sqdone: 1946 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 1947 completeevent = sceneQueueComplete(self.stats, self.rq) 1948 bb.event.fire(completeevent, self.cfgData) 1949 if self.sq_deferred: 1950 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 1951 err = True 1952 if self.updated_taskhash_queue: 1953 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 1954 err = True 1955 if self.holdoff_tasks: 1956 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 1957 err = True 1958 1959 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 1960 # No task should end up in both covered and uncovered, that is a bug. 1961 logger.error("Setscene task %s in both covered and notcovered." % tid) 1962 1963 for tid in self.rqdata.runq_setscene_tids: 1964 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 1965 err = True 1966 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 1967 if tid not in self.sq_buildable: 1968 err = True 1969 logger.error("Setscene Task %s was never marked as buildable" % tid) 1970 if tid not in self.sq_running: 1971 err = True 1972 logger.error("Setscene Task %s was never marked as running" % tid) 1973 1974 for x in self.rqdata.runtaskentries: 1975 if x not in self.tasks_covered and x not in self.tasks_notcovered: 1976 logger.error("Task %s was never moved from the setscene queue" % x) 1977 err = True 1978 if x not in self.tasks_scenequeue_done: 1979 logger.error("Task %s was never processed by the setscene code" % x) 1980 err = True 1981 if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: 1982 logger.error("Task %s was never marked as buildable by the setscene code" % x) 1983 err = True 1984 return err 1985 1986 1987 def execute(self): 1988 """ 1989 Run the tasks in a queue prepared by prepare_runqueue 1990 """ 1991 1992 self.rq.read_workers() 1993 if self.updated_taskhash_queue or self.pending_migrations: 1994 self.process_possible_migrations() 1995 1996 if not hasattr(self, "sorted_setscene_tids"): 1997 # Don't want to sort this set every execution 1998 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 1999 2000 task = None 2001 if not self.sqdone and self.can_start_task(): 2002 # Find the next setscene to run 2003 for nexttask in self.sorted_setscene_tids: 2004 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): 2005 if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2006 if nexttask not in self.rqdata.target_tids: 2007 logger.debug2("Skipping setscene for task %s" % nexttask) 2008 self.sq_task_skip(nexttask) 2009 self.scenequeue_notneeded.add(nexttask) 2010 if nexttask in self.sq_deferred: 2011 del self.sq_deferred[nexttask] 2012 return True 2013 # If covered tasks are running, need to wait for them to complete 2014 for t in self.sqdata.sq_covered_tasks[nexttask]: 2015 if t in self.runq_running and t not in self.runq_complete: 2016 continue 2017 if nexttask in self.sq_deferred: 2018 if self.sq_deferred[nexttask] not in self.runq_complete: 2019 continue 2020 logger.debug("Task %s no longer deferred" % nexttask) 2021 del self.sq_deferred[nexttask] 2022 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2023 if not valid: 2024 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2025 self.sq_task_failoutright(nexttask) 2026 return True 2027 if nexttask in self.sqdata.outrightfail: 2028 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2029 self.sq_task_failoutright(nexttask) 2030 return True 2031 if nexttask in self.sqdata.unskippable: 2032 logger.debug2("Setscene task %s is unskippable" % nexttask) 2033 task = nexttask 2034 break 2035 if task is not None: 2036 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2037 taskname = taskname + "_setscene" 2038 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2039 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2040 self.sq_task_failoutright(task) 2041 return True 2042 2043 if self.cooker.configuration.force: 2044 if task in self.rqdata.target_tids: 2045 self.sq_task_failoutright(task) 2046 return True 2047 2048 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2049 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2050 self.sq_task_skip(task) 2051 return True 2052 2053 if self.cooker.configuration.skipsetscene: 2054 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2055 self.sq_task_failoutright(task) 2056 return True 2057 2058 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 2059 bb.event.fire(startevent, self.cfgData) 2060 2061 taskdepdata = self.sq_build_taskdepdata(task) 2062 2063 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2064 taskhash = self.rqdata.get_task_hash(task) 2065 unihash = self.rqdata.get_task_unihash(task) 2066 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2067 if not mc in self.rq.fakeworker: 2068 self.rq.start_fakeworker(self, mc) 2069 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2070 self.rq.fakeworker[mc].process.stdin.flush() 2071 else: 2072 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2073 self.rq.worker[mc].process.stdin.flush() 2074 2075 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2076 self.build_stamps2.append(self.build_stamps[task]) 2077 self.sq_running.add(task) 2078 self.sq_live.add(task) 2079 self.stats.updateActiveSetscene(len(self.sq_live)) 2080 if self.can_start_task(): 2081 return True 2082 2083 self.update_holdofftasks() 2084 2085 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2086 hashequiv_logger.verbose("Setscene tasks completed") 2087 2088 err = self.summarise_scenequeue_errors() 2089 if err: 2090 self.rq.state = runQueueFailed 2091 return True 2092 2093 if self.cooker.configuration.setsceneonly: 2094 self.rq.state = runQueueComplete 2095 return True 2096 self.sqdone = True 2097 2098 if self.stats.total == 0: 2099 # nothing to do 2100 self.rq.state = runQueueComplete 2101 return True 2102 2103 if self.cooker.configuration.setsceneonly: 2104 task = None 2105 else: 2106 task = self.sched.next() 2107 if task is not None: 2108 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2109 2110 if self.rqdata.setscene_ignore_tasks is not None: 2111 if self.check_setscene_ignore_tasks(task): 2112 self.task_fail(task, "setscene ignore_tasks") 2113 return True 2114 2115 if task in self.tasks_covered: 2116 logger.debug2("Setscene covered task %s", task) 2117 self.task_skip(task, "covered") 2118 return True 2119 2120 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2121 logger.debug2("Stamp current task %s", task) 2122 2123 self.task_skip(task, "existing") 2124 self.runq_tasksrun.add(task) 2125 return True 2126 2127 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2128 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2129 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2130 noexec=True) 2131 bb.event.fire(startevent, self.cfgData) 2132 self.runq_running.add(task) 2133 self.stats.taskActive() 2134 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2135 bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn) 2136 self.task_complete(task) 2137 return True 2138 else: 2139 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2140 bb.event.fire(startevent, self.cfgData) 2141 2142 taskdepdata = self.build_taskdepdata(task) 2143 2144 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2145 taskhash = self.rqdata.get_task_hash(task) 2146 unihash = self.rqdata.get_task_unihash(task) 2147 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2148 if not mc in self.rq.fakeworker: 2149 try: 2150 self.rq.start_fakeworker(self, mc) 2151 except OSError as exc: 2152 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2153 self.rq.state = runQueueFailed 2154 self.stats.taskFailed() 2155 return True 2156 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2157 self.rq.fakeworker[mc].process.stdin.flush() 2158 else: 2159 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2160 self.rq.worker[mc].process.stdin.flush() 2161 2162 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2163 self.build_stamps2.append(self.build_stamps[task]) 2164 self.runq_running.add(task) 2165 self.stats.taskActive() 2166 if self.can_start_task(): 2167 return True 2168 2169 if self.stats.active > 0 or self.sq_live: 2170 self.rq.read_workers() 2171 return self.rq.active_fds() 2172 2173 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2174 if self.sq_deferred: 2175 tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0]) 2176 logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid) 2177 if tid not in self.runq_complete: 2178 self.sq_task_failoutright(tid) 2179 return True 2180 2181 if self.failed_tids: 2182 self.rq.state = runQueueFailed 2183 return True 2184 2185 # Sanity Checks 2186 err = self.summarise_scenequeue_errors() 2187 for task in self.rqdata.runtaskentries: 2188 if task not in self.runq_buildable: 2189 logger.error("Task %s never buildable!", task) 2190 err = True 2191 elif task not in self.runq_running: 2192 logger.error("Task %s never ran!", task) 2193 err = True 2194 elif task not in self.runq_complete: 2195 logger.error("Task %s never completed!", task) 2196 err = True 2197 2198 if err: 2199 self.rq.state = runQueueFailed 2200 else: 2201 self.rq.state = runQueueComplete 2202 2203 return True 2204 2205 def filtermcdeps(self, task, mc, deps): 2206 ret = set() 2207 for dep in deps: 2208 thismc = mc_from_tid(dep) 2209 if thismc != mc: 2210 continue 2211 ret.add(dep) 2212 return ret 2213 2214 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2215 # as most code can't handle them 2216 def build_taskdepdata(self, task): 2217 taskdepdata = {} 2218 mc = mc_from_tid(task) 2219 next = self.rqdata.runtaskentries[task].depends.copy() 2220 next.add(task) 2221 next = self.filtermcdeps(task, mc, next) 2222 while next: 2223 additional = [] 2224 for revdep in next: 2225 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2226 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2227 deps = self.rqdata.runtaskentries[revdep].depends 2228 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2229 taskhash = self.rqdata.runtaskentries[revdep].hash 2230 unihash = self.rqdata.runtaskentries[revdep].unihash 2231 deps = self.filtermcdeps(task, mc, deps) 2232 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2233 for revdep2 in deps: 2234 if revdep2 not in taskdepdata: 2235 additional.append(revdep2) 2236 next = additional 2237 2238 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2239 return taskdepdata 2240 2241 def update_holdofftasks(self): 2242 2243 if not self.holdoff_need_update: 2244 return 2245 2246 notcovered = set(self.scenequeue_notcovered) 2247 notcovered |= self.cantskip 2248 for tid in self.scenequeue_notcovered: 2249 notcovered |= self.sqdata.sq_covered_tasks[tid] 2250 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2251 notcovered.intersection_update(self.tasks_scenequeue_done) 2252 2253 covered = set(self.scenequeue_covered) 2254 for tid in self.scenequeue_covered: 2255 covered |= self.sqdata.sq_covered_tasks[tid] 2256 covered.difference_update(notcovered) 2257 covered.intersection_update(self.tasks_scenequeue_done) 2258 2259 for tid in notcovered | covered: 2260 if not self.rqdata.runtaskentries[tid].depends: 2261 self.setbuildable(tid) 2262 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2263 self.setbuildable(tid) 2264 2265 self.tasks_covered = covered 2266 self.tasks_notcovered = notcovered 2267 2268 self.holdoff_tasks = set() 2269 2270 for tid in self.rqdata.runq_setscene_tids: 2271 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2272 self.holdoff_tasks.add(tid) 2273 2274 for tid in self.holdoff_tasks.copy(): 2275 for dep in self.sqdata.sq_covered_tasks[tid]: 2276 if dep not in self.runq_complete: 2277 self.holdoff_tasks.add(dep) 2278 2279 self.holdoff_need_update = False 2280 2281 def process_possible_migrations(self): 2282 2283 changed = set() 2284 toprocess = set() 2285 for tid, unihash in self.updated_taskhash_queue.copy(): 2286 if tid in self.runq_running and tid not in self.runq_complete: 2287 continue 2288 2289 self.updated_taskhash_queue.remove((tid, unihash)) 2290 2291 if unihash != self.rqdata.runtaskentries[tid].unihash: 2292 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2293 torehash = [tid] 2294 for deftid in self.sq_deferred: 2295 if self.sq_deferred[deftid] == tid: 2296 torehash.append(deftid) 2297 for hashtid in torehash: 2298 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2299 self.rqdata.runtaskentries[hashtid].unihash = unihash 2300 bb.parse.siggen.set_unihash(hashtid, unihash) 2301 toprocess.add(hashtid) 2302 if torehash: 2303 # Need to save after set_unihash above 2304 bb.parse.siggen.save_unitaskhashes() 2305 2306 # Work out all tasks which depend upon these 2307 total = set() 2308 next = set() 2309 for p in toprocess: 2310 next |= self.rqdata.runtaskentries[p].revdeps 2311 while next: 2312 current = next.copy() 2313 total = total | next 2314 next = set() 2315 for ntid in current: 2316 next |= self.rqdata.runtaskentries[ntid].revdeps 2317 next.difference_update(total) 2318 2319 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2320 next = set() 2321 for p in total: 2322 if not self.rqdata.runtaskentries[p].depends: 2323 next.add(p) 2324 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2325 next.add(p) 2326 2327 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2328 while next: 2329 current = next.copy() 2330 next = set() 2331 for tid in current: 2332 if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2333 continue 2334 orighash = self.rqdata.runtaskentries[tid].hash 2335 dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid)) 2336 newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc) 2337 origuni = self.rqdata.runtaskentries[tid].unihash 2338 newuni = bb.parse.siggen.get_unihash(tid) 2339 # FIXME, need to check it can come from sstate at all for determinism? 2340 remapped = False 2341 if newuni == origuni: 2342 # Nothing to do, we match, skip code below 2343 remapped = True 2344 elif tid in self.scenequeue_covered or tid in self.sq_live: 2345 # Already ran this setscene task or it running. Report the new taskhash 2346 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2347 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2348 remapped = True 2349 2350 if not remapped: 2351 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2352 self.rqdata.runtaskentries[tid].hash = newhash 2353 self.rqdata.runtaskentries[tid].unihash = newuni 2354 changed.add(tid) 2355 2356 next |= self.rqdata.runtaskentries[tid].revdeps 2357 total.remove(tid) 2358 next.intersection_update(total) 2359 2360 if changed: 2361 for mc in self.rq.worker: 2362 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2363 for mc in self.rq.fakeworker: 2364 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2365 2366 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2367 2368 for tid in changed: 2369 if tid not in self.rqdata.runq_setscene_tids: 2370 continue 2371 if tid not in self.pending_migrations: 2372 self.pending_migrations.add(tid) 2373 2374 update_tasks = [] 2375 for tid in self.pending_migrations.copy(): 2376 if tid in self.runq_running or tid in self.sq_live: 2377 # Too late, task already running, not much we can do now 2378 self.pending_migrations.remove(tid) 2379 continue 2380 2381 valid = True 2382 # Check no tasks this covers are running 2383 for dep in self.sqdata.sq_covered_tasks[tid]: 2384 if dep in self.runq_running and dep not in self.runq_complete: 2385 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2386 valid = False 2387 break 2388 if not valid: 2389 continue 2390 2391 self.pending_migrations.remove(tid) 2392 changed = True 2393 2394 if tid in self.tasks_scenequeue_done: 2395 self.tasks_scenequeue_done.remove(tid) 2396 for dep in self.sqdata.sq_covered_tasks[tid]: 2397 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2398 bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) 2399 self.failed_tids.append(tid) 2400 self.rq.state = runQueueCleanUp 2401 return 2402 2403 if dep not in self.runq_complete: 2404 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2405 self.tasks_scenequeue_done.remove(dep) 2406 2407 if tid in self.sq_buildable: 2408 self.sq_buildable.remove(tid) 2409 if tid in self.sq_running: 2410 self.sq_running.remove(tid) 2411 harddepfail = False 2412 for t in self.sqdata.sq_harddeps: 2413 if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered: 2414 harddepfail = True 2415 break 2416 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2417 if tid not in self.sq_buildable: 2418 self.sq_buildable.add(tid) 2419 if not self.sqdata.sq_revdeps[tid]: 2420 self.sq_buildable.add(tid) 2421 2422 if tid in self.sqdata.outrightfail: 2423 self.sqdata.outrightfail.remove(tid) 2424 if tid in self.scenequeue_notcovered: 2425 self.scenequeue_notcovered.remove(tid) 2426 if tid in self.scenequeue_covered: 2427 self.scenequeue_covered.remove(tid) 2428 if tid in self.scenequeue_notneeded: 2429 self.scenequeue_notneeded.remove(tid) 2430 2431 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2432 self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) 2433 2434 if tid in self.stampcache: 2435 del self.stampcache[tid] 2436 2437 if tid in self.build_stamps: 2438 del self.build_stamps[tid] 2439 2440 update_tasks.append((tid, harddepfail, tid in self.sqdata.valid)) 2441 2442 if update_tasks: 2443 self.sqdone = False 2444 for tid in [t[0] for t in update_tasks]: 2445 h = pending_hash_index(tid, self.rqdata) 2446 if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: 2447 self.sq_deferred[tid] = self.sqdata.hashes[h] 2448 bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) 2449 update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2450 2451 for (tid, harddepfail, origvalid) in update_tasks: 2452 if tid in self.sqdata.valid and not origvalid: 2453 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2454 if harddepfail: 2455 self.sq_task_failoutright(tid) 2456 2457 if changed: 2458 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2459 self.holdoff_need_update = True 2460 2461 def scenequeue_updatecounters(self, task, fail=False): 2462 2463 for dep in sorted(self.sqdata.sq_deps[task]): 2464 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: 2465 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2466 # dependency could be already processed, e.g. noexec setscene task 2467 continue 2468 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2469 if noexec or stamppresent: 2470 continue 2471 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2472 self.sq_task_failoutright(dep) 2473 continue 2474 if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2475 if dep not in self.sq_buildable: 2476 self.sq_buildable.add(dep) 2477 2478 next = set([task]) 2479 while next: 2480 new = set() 2481 for t in sorted(next): 2482 self.tasks_scenequeue_done.add(t) 2483 # Look down the dependency chain for non-setscene things which this task depends on 2484 # and mark as 'done' 2485 for dep in self.rqdata.runtaskentries[t].depends: 2486 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2487 continue 2488 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2489 new.add(dep) 2490 next = new 2491 2492 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2493 self.holdoff_need_update = True 2494 2495 def sq_task_completeoutright(self, task): 2496 """ 2497 Mark a task as completed 2498 Look at the reverse dependencies and mark any task with 2499 completed dependencies as buildable 2500 """ 2501 2502 logger.debug('Found task %s which could be accelerated', task) 2503 self.scenequeue_covered.add(task) 2504 self.scenequeue_updatecounters(task) 2505 2506 def sq_check_taskfail(self, task): 2507 if self.rqdata.setscene_ignore_tasks is not None: 2508 realtask = task.split('_setscene')[0] 2509 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2510 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2511 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2512 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2513 self.rq.state = runQueueCleanUp 2514 2515 def sq_task_complete(self, task): 2516 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2517 self.sq_task_completeoutright(task) 2518 2519 def sq_task_fail(self, task, result): 2520 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) 2521 self.scenequeue_notcovered.add(task) 2522 self.scenequeue_updatecounters(task, True) 2523 self.sq_check_taskfail(task) 2524 2525 def sq_task_failoutright(self, task): 2526 self.sq_running.add(task) 2527 self.sq_buildable.add(task) 2528 self.scenequeue_notcovered.add(task) 2529 self.scenequeue_updatecounters(task, True) 2530 2531 def sq_task_skip(self, task): 2532 self.sq_running.add(task) 2533 self.sq_buildable.add(task) 2534 self.sq_task_completeoutright(task) 2535 2536 def sq_build_taskdepdata(self, task): 2537 def getsetscenedeps(tid): 2538 deps = set() 2539 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2540 realtid = tid + "_setscene" 2541 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2542 for (depname, idependtask) in idepends: 2543 if depname not in self.rqdata.taskData[mc].build_targets: 2544 continue 2545 2546 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2547 if depfn is None: 2548 continue 2549 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2550 deps.add(deptid) 2551 return deps 2552 2553 taskdepdata = {} 2554 next = getsetscenedeps(task) 2555 next.add(task) 2556 while next: 2557 additional = [] 2558 for revdep in next: 2559 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2560 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2561 deps = getsetscenedeps(revdep) 2562 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2563 taskhash = self.rqdata.runtaskentries[revdep].hash 2564 unihash = self.rqdata.runtaskentries[revdep].unihash 2565 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2566 for revdep2 in deps: 2567 if revdep2 not in taskdepdata: 2568 additional.append(revdep2) 2569 next = additional 2570 2571 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2572 return taskdepdata 2573 2574 def check_setscene_ignore_tasks(self, tid): 2575 # Check task that is going to run against the ignore tasks list 2576 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2577 # Ignore covered tasks 2578 if tid in self.tasks_covered: 2579 return False 2580 # Ignore stamped tasks 2581 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2582 return False 2583 # Ignore noexec tasks 2584 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2585 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2586 return False 2587 2588 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2589 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2590 if tid in self.rqdata.runq_setscene_tids: 2591 msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] 2592 else: 2593 msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] 2594 for t in self.scenequeue_notcovered: 2595 msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) 2596 msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2597 logger.error("".join(msg)) 2598 return True 2599 return False 2600 2601class SQData(object): 2602 def __init__(self): 2603 # SceneQueue dependencies 2604 self.sq_deps = {} 2605 # SceneQueue reverse dependencies 2606 self.sq_revdeps = {} 2607 # Injected inter-setscene task dependencies 2608 self.sq_harddeps = {} 2609 # Cache of stamp files so duplicates can't run in parallel 2610 self.stamps = {} 2611 # Setscene tasks directly depended upon by the build 2612 self.unskippable = set() 2613 # List of setscene tasks which aren't present 2614 self.outrightfail = set() 2615 # A list of normal tasks a setscene task covers 2616 self.sq_covered_tasks = {} 2617 2618def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): 2619 2620 sq_revdeps = {} 2621 sq_revdeps_squash = {} 2622 sq_collated_deps = {} 2623 2624 # We need to construct a dependency graph for the setscene functions. Intermediate 2625 # dependencies between the setscene tasks only complicate the code. This code 2626 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2627 # only containing the setscene functions. 2628 2629 rqdata.init_progress_reporter.next_stage() 2630 2631 # First process the chains up to the first setscene task. 2632 endpoints = {} 2633 for tid in rqdata.runtaskentries: 2634 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2635 sq_revdeps_squash[tid] = set() 2636 if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: 2637 #bb.warn("Added endpoint %s" % (tid)) 2638 endpoints[tid] = set() 2639 2640 rqdata.init_progress_reporter.next_stage() 2641 2642 # Secondly process the chains between setscene tasks. 2643 for tid in rqdata.runq_setscene_tids: 2644 sq_collated_deps[tid] = set() 2645 #bb.warn("Added endpoint 2 %s" % (tid)) 2646 for dep in rqdata.runtaskentries[tid].depends: 2647 if tid in sq_revdeps[dep]: 2648 sq_revdeps[dep].remove(tid) 2649 if dep not in endpoints: 2650 endpoints[dep] = set() 2651 #bb.warn(" Added endpoint 3 %s" % (dep)) 2652 endpoints[dep].add(tid) 2653 2654 rqdata.init_progress_reporter.next_stage() 2655 2656 def process_endpoints(endpoints): 2657 newendpoints = {} 2658 for point, task in endpoints.items(): 2659 tasks = set() 2660 if task: 2661 tasks |= task 2662 if sq_revdeps_squash[point]: 2663 tasks |= sq_revdeps_squash[point] 2664 if point not in rqdata.runq_setscene_tids: 2665 for t in tasks: 2666 sq_collated_deps[t].add(point) 2667 sq_revdeps_squash[point] = set() 2668 if point in rqdata.runq_setscene_tids: 2669 sq_revdeps_squash[point] = tasks 2670 continue 2671 for dep in rqdata.runtaskentries[point].depends: 2672 if point in sq_revdeps[dep]: 2673 sq_revdeps[dep].remove(point) 2674 if tasks: 2675 sq_revdeps_squash[dep] |= tasks 2676 if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: 2677 newendpoints[dep] = task 2678 if newendpoints: 2679 process_endpoints(newendpoints) 2680 2681 process_endpoints(endpoints) 2682 2683 rqdata.init_progress_reporter.next_stage() 2684 2685 # Build a list of tasks which are "unskippable" 2686 # These are direct endpoints referenced by the build upto and including setscene tasks 2687 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2688 new = True 2689 for tid in rqdata.runtaskentries: 2690 if not rqdata.runtaskentries[tid].revdeps: 2691 sqdata.unskippable.add(tid) 2692 sqdata.unskippable |= sqrq.cantskip 2693 while new: 2694 new = False 2695 orig = sqdata.unskippable.copy() 2696 for tid in sorted(orig, reverse=True): 2697 if tid in rqdata.runq_setscene_tids: 2698 continue 2699 if not rqdata.runtaskentries[tid].depends: 2700 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2701 sqrq.setbuildable(tid) 2702 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2703 if sqdata.unskippable != orig: 2704 new = True 2705 2706 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 2707 2708 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2709 2710 # Sanity check all dependencies could be changed to setscene task references 2711 for taskcounter, tid in enumerate(rqdata.runtaskentries): 2712 if tid in rqdata.runq_setscene_tids: 2713 pass 2714 elif sq_revdeps_squash[tid]: 2715 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") 2716 else: 2717 del sq_revdeps_squash[tid] 2718 rqdata.init_progress_reporter.update(taskcounter) 2719 2720 rqdata.init_progress_reporter.next_stage() 2721 2722 # Resolve setscene inter-task dependencies 2723 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 2724 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 2725 for tid in rqdata.runq_setscene_tids: 2726 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2727 realtid = tid + "_setscene" 2728 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 2729 sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True) 2730 for (depname, idependtask) in idepends: 2731 2732 if depname not in rqdata.taskData[mc].build_targets: 2733 continue 2734 2735 depfn = rqdata.taskData[mc].build_targets[depname][0] 2736 if depfn is None: 2737 continue 2738 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2739 if deptid not in rqdata.runtaskentries: 2740 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 2741 2742 if not deptid in sqdata.sq_harddeps: 2743 sqdata.sq_harddeps[deptid] = set() 2744 sqdata.sq_harddeps[deptid].add(tid) 2745 2746 sq_revdeps_squash[tid].add(deptid) 2747 # Have to zero this to avoid circular dependencies 2748 sq_revdeps_squash[deptid] = set() 2749 2750 rqdata.init_progress_reporter.next_stage() 2751 2752 for task in sqdata.sq_harddeps: 2753 for dep in sqdata.sq_harddeps[task]: 2754 sq_revdeps_squash[dep].add(task) 2755 2756 rqdata.init_progress_reporter.next_stage() 2757 2758 #for tid in sq_revdeps_squash: 2759 # data = "" 2760 # for dep in sq_revdeps_squash[tid]: 2761 # data = data + "\n %s" % dep 2762 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 2763 2764 sqdata.sq_revdeps = sq_revdeps_squash 2765 sqdata.sq_covered_tasks = sq_collated_deps 2766 2767 # Build reverse version of revdeps to populate deps structure 2768 for tid in sqdata.sq_revdeps: 2769 sqdata.sq_deps[tid] = set() 2770 for tid in sqdata.sq_revdeps: 2771 for dep in sqdata.sq_revdeps[tid]: 2772 sqdata.sq_deps[dep].add(tid) 2773 2774 rqdata.init_progress_reporter.next_stage() 2775 2776 sqdata.multiconfigs = set() 2777 for tid in sqdata.sq_revdeps: 2778 sqdata.multiconfigs.add(mc_from_tid(tid)) 2779 if not sqdata.sq_revdeps[tid]: 2780 sqrq.sq_buildable.add(tid) 2781 2782 rqdata.init_progress_reporter.finish() 2783 2784 sqdata.noexec = set() 2785 sqdata.stamppresent = set() 2786 sqdata.valid = set() 2787 2788 sqdata.hashes = {} 2789 sqrq.sq_deferred = {} 2790 for mc in sorted(sqdata.multiconfigs): 2791 for tid in sorted(sqdata.sq_revdeps): 2792 if mc_from_tid(tid) != mc: 2793 continue 2794 h = pending_hash_index(tid, rqdata) 2795 if h not in sqdata.hashes: 2796 sqdata.hashes[h] = tid 2797 else: 2798 sqrq.sq_deferred[tid] = sqdata.hashes[h] 2799 bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) 2800 2801 update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) 2802 2803 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 2804 # in any stamp files. Pass the list out to metadata as an event. 2805 found = {} 2806 for tid in rqdata.runq_setscene_tids: 2807 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2808 stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn) 2809 if stamps: 2810 if mc not in found: 2811 found[mc] = {} 2812 found[mc][tid] = stamps 2813 for mc in found: 2814 event = bb.event.StaleSetSceneTasks(found[mc]) 2815 bb.event.fire(event, cooker.databuilder.mcdata[mc]) 2816 2817def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 2818 2819 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2820 2821 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 2822 2823 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2824 bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn) 2825 return True, False 2826 2827 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 2828 logger.debug2('Setscene stamp current for task %s', tid) 2829 return False, True 2830 2831 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 2832 logger.debug2('Normal stamp current for task %s', tid) 2833 return False, True 2834 2835 return False, False 2836 2837def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 2838 2839 tocheck = set() 2840 2841 for tid in sorted(tids): 2842 if tid in sqdata.stamppresent: 2843 sqdata.stamppresent.remove(tid) 2844 if tid in sqdata.valid: 2845 sqdata.valid.remove(tid) 2846 if tid in sqdata.outrightfail: 2847 sqdata.outrightfail.remove(tid) 2848 2849 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 2850 2851 if noexec: 2852 sqdata.noexec.add(tid) 2853 sqrq.sq_task_skip(tid) 2854 continue 2855 2856 if stamppresent: 2857 sqdata.stamppresent.add(tid) 2858 sqrq.sq_task_skip(tid) 2859 continue 2860 2861 tocheck.add(tid) 2862 2863 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 2864 2865 for tid in tids: 2866 if tid in sqdata.stamppresent: 2867 continue 2868 if tid in sqdata.valid: 2869 continue 2870 if tid in sqdata.noexec: 2871 continue 2872 if tid in sqrq.scenequeue_covered: 2873 continue 2874 if tid in sqrq.scenequeue_notcovered: 2875 continue 2876 if tid in sqrq.sq_deferred: 2877 continue 2878 sqdata.outrightfail.add(tid) 2879 2880class TaskFailure(Exception): 2881 """ 2882 Exception raised when a task in a runqueue fails 2883 """ 2884 def __init__(self, x): 2885 self.args = x 2886 2887 2888class runQueueExitWait(bb.event.Event): 2889 """ 2890 Event when waiting for task processes to exit 2891 """ 2892 2893 def __init__(self, remain): 2894 self.remain = remain 2895 self.message = "Waiting for %s active tasks to finish" % remain 2896 bb.event.Event.__init__(self) 2897 2898class runQueueEvent(bb.event.Event): 2899 """ 2900 Base runQueue event class 2901 """ 2902 def __init__(self, task, stats, rq): 2903 self.taskid = task 2904 self.taskstring = task 2905 self.taskname = taskname_from_tid(task) 2906 self.taskfile = fn_from_tid(task) 2907 self.taskhash = rq.rqdata.get_task_hash(task) 2908 self.stats = stats.copy() 2909 bb.event.Event.__init__(self) 2910 2911class sceneQueueEvent(runQueueEvent): 2912 """ 2913 Base sceneQueue event class 2914 """ 2915 def __init__(self, task, stats, rq, noexec=False): 2916 runQueueEvent.__init__(self, task, stats, rq) 2917 self.taskstring = task + "_setscene" 2918 self.taskname = taskname_from_tid(task) + "_setscene" 2919 self.taskfile = fn_from_tid(task) 2920 self.taskhash = rq.rqdata.get_task_hash(task) 2921 2922class runQueueTaskStarted(runQueueEvent): 2923 """ 2924 Event notifying a task was started 2925 """ 2926 def __init__(self, task, stats, rq, noexec=False): 2927 runQueueEvent.__init__(self, task, stats, rq) 2928 self.noexec = noexec 2929 2930class sceneQueueTaskStarted(sceneQueueEvent): 2931 """ 2932 Event notifying a setscene task was started 2933 """ 2934 def __init__(self, task, stats, rq, noexec=False): 2935 sceneQueueEvent.__init__(self, task, stats, rq) 2936 self.noexec = noexec 2937 2938class runQueueTaskFailed(runQueueEvent): 2939 """ 2940 Event notifying a task failed 2941 """ 2942 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 2943 runQueueEvent.__init__(self, task, stats, rq) 2944 self.exitcode = exitcode 2945 self.fakeroot_log = fakeroot_log 2946 2947 def __str__(self): 2948 if self.fakeroot_log: 2949 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 2950 else: 2951 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 2952 2953class sceneQueueTaskFailed(sceneQueueEvent): 2954 """ 2955 Event notifying a setscene task failed 2956 """ 2957 def __init__(self, task, stats, exitcode, rq): 2958 sceneQueueEvent.__init__(self, task, stats, rq) 2959 self.exitcode = exitcode 2960 2961 def __str__(self): 2962 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 2963 2964class sceneQueueComplete(sceneQueueEvent): 2965 """ 2966 Event when all the sceneQueue tasks are complete 2967 """ 2968 def __init__(self, stats, rq): 2969 self.stats = stats.copy() 2970 bb.event.Event.__init__(self) 2971 2972class runQueueTaskCompleted(runQueueEvent): 2973 """ 2974 Event notifying a task completed 2975 """ 2976 2977class sceneQueueTaskCompleted(sceneQueueEvent): 2978 """ 2979 Event notifying a setscene task completed 2980 """ 2981 2982class runQueueTaskSkipped(runQueueEvent): 2983 """ 2984 Event notifying a task was skipped 2985 """ 2986 def __init__(self, task, stats, rq, reason): 2987 runQueueEvent.__init__(self, task, stats, rq) 2988 self.reason = reason 2989 2990class taskUniHashUpdate(bb.event.Event): 2991 """ 2992 Base runQueue event class 2993 """ 2994 def __init__(self, task, unihash): 2995 self.taskid = task 2996 self.unihash = unihash 2997 bb.event.Event.__init__(self) 2998 2999class runQueuePipe(): 3000 """ 3001 Abstraction for a pipe between a worker thread and the server 3002 """ 3003 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3004 self.input = pipein 3005 if pipeout: 3006 pipeout.close() 3007 bb.utils.nonblockingfd(self.input) 3008 self.queue = b"" 3009 self.d = d 3010 self.rq = rq 3011 self.rqexec = rqexec 3012 self.fakerootlogs = fakerootlogs 3013 3014 def setrunqueueexec(self, rqexec): 3015 self.rqexec = rqexec 3016 3017 def read(self): 3018 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3019 for worker in workers.values(): 3020 worker.process.poll() 3021 if worker.process.returncode is not None and not self.rq.teardown: 3022 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3023 self.rq.finish_runqueue(True) 3024 3025 start = len(self.queue) 3026 try: 3027 self.queue = self.queue + (self.input.read(102400) or b"") 3028 except (OSError, IOError) as e: 3029 if e.errno != errno.EAGAIN: 3030 raise 3031 end = len(self.queue) 3032 found = True 3033 while found and self.queue: 3034 found = False 3035 index = self.queue.find(b"</event>") 3036 while index != -1 and self.queue.startswith(b"<event>"): 3037 try: 3038 event = pickle.loads(self.queue[7:index]) 3039 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3040 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3041 # The pickled data could contain "</event>" so search for the next occurance 3042 # unpickling again, this should be the only way an unpickle error could occur 3043 index = self.queue.find(b"</event>", index + 1) 3044 continue 3045 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3046 bb.event.fire_from_worker(event, self.d) 3047 if isinstance(event, taskUniHashUpdate): 3048 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3049 found = True 3050 self.queue = self.queue[index+8:] 3051 index = self.queue.find(b"</event>") 3052 index = self.queue.find(b"</exitcode>") 3053 while index != -1 and self.queue.startswith(b"<exitcode>"): 3054 try: 3055 task, status = pickle.loads(self.queue[10:index]) 3056 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3057 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3058 (_, _, _, taskfn) = split_tid_mcfn(task) 3059 fakerootlog = None 3060 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3061 fakerootlog = self.fakerootlogs[taskfn] 3062 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3063 found = True 3064 self.queue = self.queue[index+11:] 3065 index = self.queue.find(b"</exitcode>") 3066 return (end > start) 3067 3068 def close(self): 3069 while self.read(): 3070 continue 3071 if self.queue: 3072 print("Warning, worker left partial message: %s" % self.queue) 3073 self.input.close() 3074 3075def get_setscene_enforce_ignore_tasks(d, targets): 3076 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3077 return None 3078 ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() 3079 outlist = [] 3080 for item in ignore_tasks[:]: 3081 if item.startswith('%:'): 3082 for (mc, target, task, fn) in targets: 3083 outlist.append(target + ':' + item.split(':')[1]) 3084 else: 3085 outlist.append(item) 3086 return outlist 3087 3088def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): 3089 import fnmatch 3090 if ignore_tasks is not None: 3091 item = '%s:%s' % (pn, taskname) 3092 for ignore_tasks in ignore_tasks: 3093 if fnmatch.fnmatch(item, ignore_tasks): 3094 return True 3095 return False 3096 return True 3097