1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import logging 18import re 19import bb 20from bb import msg, event 21from bb import monitordisk 22import subprocess 23import pickle 24from multiprocessing import Process 25import shlex 26import pprint 27 28bblogger = logging.getLogger("BitBake") 29logger = logging.getLogger("BitBake.RunQueue") 30hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 31 32__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 33 34def fn_from_tid(tid): 35 return tid.rsplit(":", 1)[0] 36 37def taskname_from_tid(tid): 38 return tid.rsplit(":", 1)[1] 39 40def mc_from_tid(tid): 41 if tid.startswith('mc:') and tid.count(':') >= 2: 42 return tid.split(':')[1] 43 return "" 44 45def split_tid(tid): 46 (mc, fn, taskname, _) = split_tid_mcfn(tid) 47 return (mc, fn, taskname) 48 49def split_mc(n): 50 if n.startswith("mc:") and n.count(':') >= 2: 51 _, mc, n = n.split(":", 2) 52 return (mc, n) 53 return ('', n) 54 55def split_tid_mcfn(tid): 56 if tid.startswith('mc:') and tid.count(':') >= 2: 57 elems = tid.split(':') 58 mc = elems[1] 59 fn = ":".join(elems[2:-1]) 60 taskname = elems[-1] 61 mcfn = "mc:" + mc + ":" + fn 62 else: 63 tid = tid.rsplit(":", 1) 64 mc = "" 65 fn = tid[0] 66 taskname = tid[1] 67 mcfn = fn 68 69 return (mc, fn, taskname, mcfn) 70 71def build_tid(mc, fn, taskname): 72 if mc: 73 return "mc:" + mc + ":" + fn + ":" + taskname 74 return fn + ":" + taskname 75 76# Index used to pair up potentially matching multiconfig tasks 77# We match on PN, taskname and hash being equal 78def pending_hash_index(tid, rqdata): 79 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 80 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 81 h = rqdata.runtaskentries[tid].unihash 82 return pn + ":" + "taskname" + h 83 84class RunQueueStats: 85 """ 86 Holds statistics on the tasks handled by the associated runQueue 87 """ 88 def __init__(self, total): 89 self.completed = 0 90 self.skipped = 0 91 self.failed = 0 92 self.active = 0 93 self.total = total 94 95 def copy(self): 96 obj = self.__class__(self.total) 97 obj.__dict__.update(self.__dict__) 98 return obj 99 100 def taskFailed(self): 101 self.active = self.active - 1 102 self.failed = self.failed + 1 103 104 def taskCompleted(self): 105 self.active = self.active - 1 106 self.completed = self.completed + 1 107 108 def taskSkipped(self): 109 self.active = self.active + 1 110 self.skipped = self.skipped + 1 111 112 def taskActive(self): 113 self.active = self.active + 1 114 115# These values indicate the next step due to be run in the 116# runQueue state machine 117runQueuePrepare = 2 118runQueueSceneInit = 3 119runQueueRunning = 6 120runQueueFailed = 7 121runQueueCleanUp = 8 122runQueueComplete = 9 123 124class RunQueueScheduler(object): 125 """ 126 Control the order tasks are scheduled in. 127 """ 128 name = "basic" 129 130 def __init__(self, runqueue, rqdata): 131 """ 132 The default scheduler just returns the first buildable task (the 133 priority map is sorted by task number) 134 """ 135 self.rq = runqueue 136 self.rqdata = rqdata 137 self.numTasks = len(self.rqdata.runtaskentries) 138 139 self.prio_map = [self.rqdata.runtaskentries.keys()] 140 141 self.buildable = set() 142 self.skip_maxthread = {} 143 self.stamps = {} 144 for tid in self.rqdata.runtaskentries: 145 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 146 self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 147 if tid in self.rq.runq_buildable: 148 self.buildable.append(tid) 149 150 self.rev_prio_map = None 151 152 def next_buildable_task(self): 153 """ 154 Return the id of the first task we find that is buildable 155 """ 156 # Once tasks are running we don't need to worry about them again 157 self.buildable.difference_update(self.rq.runq_running) 158 buildable = set(self.buildable) 159 buildable.difference_update(self.rq.holdoff_tasks) 160 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 161 if not buildable: 162 return None 163 164 # Filter out tasks that have a max number of threads that have been exceeded 165 skip_buildable = {} 166 for running in self.rq.runq_running.difference(self.rq.runq_complete): 167 rtaskname = taskname_from_tid(running) 168 if rtaskname not in self.skip_maxthread: 169 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 170 if not self.skip_maxthread[rtaskname]: 171 continue 172 if rtaskname in skip_buildable: 173 skip_buildable[rtaskname] += 1 174 else: 175 skip_buildable[rtaskname] = 1 176 177 if len(buildable) == 1: 178 tid = buildable.pop() 179 taskname = taskname_from_tid(tid) 180 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 181 return None 182 stamp = self.stamps[tid] 183 if stamp not in self.rq.build_stamps.values(): 184 return tid 185 186 if not self.rev_prio_map: 187 self.rev_prio_map = {} 188 for tid in self.rqdata.runtaskentries: 189 self.rev_prio_map[tid] = self.prio_map.index(tid) 190 191 best = None 192 bestprio = None 193 for tid in buildable: 194 taskname = taskname_from_tid(tid) 195 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 196 continue 197 prio = self.rev_prio_map[tid] 198 if bestprio is None or bestprio > prio: 199 stamp = self.stamps[tid] 200 if stamp in self.rq.build_stamps.values(): 201 continue 202 bestprio = prio 203 best = tid 204 205 return best 206 207 def next(self): 208 """ 209 Return the id of the task we should build next 210 """ 211 if self.rq.can_start_task(): 212 return self.next_buildable_task() 213 214 def newbuildable(self, task): 215 self.buildable.add(task) 216 217 def removebuildable(self, task): 218 self.buildable.remove(task) 219 220 def describe_task(self, taskid): 221 result = 'ID %s' % taskid 222 if self.rev_prio_map: 223 result = result + (' pri %d' % self.rev_prio_map[taskid]) 224 return result 225 226 def dump_prio(self, comment): 227 bb.debug(3, '%s (most important first):\n%s' % 228 (comment, 229 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 230 index, taskid in enumerate(self.prio_map)]))) 231 232class RunQueueSchedulerSpeed(RunQueueScheduler): 233 """ 234 A scheduler optimised for speed. The priority map is sorted by task weight, 235 heavier weighted tasks (tasks needed by the most other tasks) are run first. 236 """ 237 name = "speed" 238 239 def __init__(self, runqueue, rqdata): 240 """ 241 The priority map is sorted by task weight. 242 """ 243 RunQueueScheduler.__init__(self, runqueue, rqdata) 244 245 weights = {} 246 for tid in self.rqdata.runtaskentries: 247 weight = self.rqdata.runtaskentries[tid].weight 248 if not weight in weights: 249 weights[weight] = [] 250 weights[weight].append(tid) 251 252 self.prio_map = [] 253 for weight in sorted(weights): 254 for w in weights[weight]: 255 self.prio_map.append(w) 256 257 self.prio_map.reverse() 258 259class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 260 """ 261 A scheduler optimised to complete .bb files as quickly as possible. The 262 priority map is sorted by task weight, but then reordered so once a given 263 .bb file starts to build, it's completed as quickly as possible by 264 running all tasks related to the same .bb file one after the after. 265 This works well where disk space is at a premium and classes like OE's 266 rm_work are in force. 267 """ 268 name = "completion" 269 270 def __init__(self, runqueue, rqdata): 271 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 272 273 # Extract list of tasks for each recipe, with tasks sorted 274 # ascending from "must run first" (typically do_fetch) to 275 # "runs last" (do_build). The speed scheduler prioritizes 276 # tasks that must run first before the ones that run later; 277 # this is what we depend on here. 278 task_lists = {} 279 for taskid in self.prio_map: 280 fn, taskname = taskid.rsplit(':', 1) 281 task_lists.setdefault(fn, []).append(taskname) 282 283 # Now unify the different task lists. The strategy is that 284 # common tasks get skipped and new ones get inserted after the 285 # preceeding common one(s) as they are found. Because task 286 # lists should differ only by their number of tasks, but not 287 # the ordering of the common tasks, this should result in a 288 # deterministic result that is a superset of the individual 289 # task ordering. 290 all_tasks = [] 291 for recipe, new_tasks in task_lists.items(): 292 index = 0 293 old_task = all_tasks[index] if index < len(all_tasks) else None 294 for new_task in new_tasks: 295 if old_task == new_task: 296 # Common task, skip it. This is the fast-path which 297 # avoids a full search. 298 index += 1 299 old_task = all_tasks[index] if index < len(all_tasks) else None 300 else: 301 try: 302 index = all_tasks.index(new_task) 303 # Already present, just not at the current 304 # place. We re-synchronized by changing the 305 # index so that it matches again. Now 306 # move on to the next existing task. 307 index += 1 308 old_task = all_tasks[index] if index < len(all_tasks) else None 309 except ValueError: 310 # Not present. Insert before old_task, which 311 # remains the same (but gets shifted back). 312 all_tasks.insert(index, new_task) 313 index += 1 314 bb.debug(3, 'merged task list: %s' % all_tasks) 315 316 # Now reverse the order so that tasks that finish the work on one 317 # recipe are considered more imporant (= come first). The ordering 318 # is now so that do_build is most important. 319 all_tasks.reverse() 320 321 # Group tasks of the same kind before tasks of less important 322 # kinds at the head of the queue (because earlier = lower 323 # priority number = runs earlier), while preserving the 324 # ordering by recipe. If recipe foo is more important than 325 # bar, then the goal is to work on foo's do_populate_sysroot 326 # before bar's do_populate_sysroot and on the more important 327 # tasks of foo before any of the less important tasks in any 328 # other recipe (if those other recipes are more important than 329 # foo). 330 # 331 # All of this only applies when tasks are runable. Explicit 332 # dependencies still override this ordering by priority. 333 # 334 # Here's an example why this priority re-ordering helps with 335 # minimizing disk usage. Consider a recipe foo with a higher 336 # priority than bar where foo DEPENDS on bar. Then the 337 # implicit rule (from base.bbclass) is that foo's do_configure 338 # depends on bar's do_populate_sysroot. This ensures that 339 # bar's do_populate_sysroot gets done first. Normally the 340 # tasks from foo would continue to run once that is done, and 341 # bar only gets completed and cleaned up later. By ordering 342 # bar's task that depend on bar's do_populate_sysroot before foo's 343 # do_configure, that problem gets avoided. 344 task_index = 0 345 self.dump_prio('original priorities') 346 for task in all_tasks: 347 for index in range(task_index, self.numTasks): 348 taskid = self.prio_map[index] 349 taskname = taskid.rsplit(':', 1)[1] 350 if taskname == task: 351 del self.prio_map[index] 352 self.prio_map.insert(task_index, taskid) 353 task_index += 1 354 self.dump_prio('completion priorities') 355 356class RunTaskEntry(object): 357 def __init__(self): 358 self.depends = set() 359 self.revdeps = set() 360 self.hash = None 361 self.unihash = None 362 self.task = None 363 self.weight = 1 364 365class RunQueueData: 366 """ 367 BitBake Run Queue implementation 368 """ 369 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 370 self.cooker = cooker 371 self.dataCaches = dataCaches 372 self.taskData = taskData 373 self.targets = targets 374 self.rq = rq 375 self.warn_multi_bb = False 376 377 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST") or "" 378 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST") or "").split() 379 self.setscenewhitelist = get_setscene_enforce_whitelist(cfgData, targets) 380 self.setscenewhitelist_checked = False 381 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 382 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 383 384 self.reset() 385 386 def reset(self): 387 self.runtaskentries = {} 388 389 def runq_depends_names(self, ids): 390 import re 391 ret = [] 392 for id in ids: 393 nam = os.path.basename(id) 394 nam = re.sub("_[^,]*,", ",", nam) 395 ret.extend([nam]) 396 return ret 397 398 def get_task_hash(self, tid): 399 return self.runtaskentries[tid].hash 400 401 def get_task_unihash(self, tid): 402 return self.runtaskentries[tid].unihash 403 404 def get_user_idstring(self, tid, task_name_suffix = ""): 405 return tid + task_name_suffix 406 407 def get_short_user_idstring(self, task, task_name_suffix = ""): 408 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 409 pn = self.dataCaches[mc].pkg_fn[taskfn] 410 taskname = taskname_from_tid(task) + task_name_suffix 411 return "%s:%s" % (pn, taskname) 412 413 def circular_depchains_handler(self, tasks): 414 """ 415 Some tasks aren't buildable, likely due to circular dependency issues. 416 Identify the circular dependencies and print them in a user readable format. 417 """ 418 from copy import deepcopy 419 420 valid_chains = [] 421 explored_deps = {} 422 msgs = [] 423 424 class TooManyLoops(Exception): 425 pass 426 427 def chain_reorder(chain): 428 """ 429 Reorder a dependency chain so the lowest task id is first 430 """ 431 lowest = 0 432 new_chain = [] 433 for entry in range(len(chain)): 434 if chain[entry] < chain[lowest]: 435 lowest = entry 436 new_chain.extend(chain[lowest:]) 437 new_chain.extend(chain[:lowest]) 438 return new_chain 439 440 def chain_compare_equal(chain1, chain2): 441 """ 442 Compare two dependency chains and see if they're the same 443 """ 444 if len(chain1) != len(chain2): 445 return False 446 for index in range(len(chain1)): 447 if chain1[index] != chain2[index]: 448 return False 449 return True 450 451 def chain_array_contains(chain, chain_array): 452 """ 453 Return True if chain_array contains chain 454 """ 455 for ch in chain_array: 456 if chain_compare_equal(ch, chain): 457 return True 458 return False 459 460 def find_chains(tid, prev_chain): 461 prev_chain.append(tid) 462 total_deps = [] 463 total_deps.extend(self.runtaskentries[tid].revdeps) 464 for revdep in self.runtaskentries[tid].revdeps: 465 if revdep in prev_chain: 466 idx = prev_chain.index(revdep) 467 # To prevent duplicates, reorder the chain to start with the lowest taskid 468 # and search through an array of those we've already printed 469 chain = prev_chain[idx:] 470 new_chain = chain_reorder(chain) 471 if not chain_array_contains(new_chain, valid_chains): 472 valid_chains.append(new_chain) 473 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 474 for dep in new_chain: 475 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 476 msgs.append("\n") 477 if len(valid_chains) > 10: 478 msgs.append("Aborted dependency loops search after 10 matches.\n") 479 raise TooManyLoops 480 continue 481 scan = False 482 if revdep not in explored_deps: 483 scan = True 484 elif revdep in explored_deps[revdep]: 485 scan = True 486 else: 487 for dep in prev_chain: 488 if dep in explored_deps[revdep]: 489 scan = True 490 if scan: 491 find_chains(revdep, copy.deepcopy(prev_chain)) 492 for dep in explored_deps[revdep]: 493 if dep not in total_deps: 494 total_deps.append(dep) 495 496 explored_deps[tid] = total_deps 497 498 try: 499 for task in tasks: 500 find_chains(task, []) 501 except TooManyLoops: 502 pass 503 504 return msgs 505 506 def calculate_task_weights(self, endpoints): 507 """ 508 Calculate a number representing the "weight" of each task. Heavier weighted tasks 509 have more dependencies and hence should be executed sooner for maximum speed. 510 511 This function also sanity checks the task list finding tasks that are not 512 possible to execute due to circular dependencies. 513 """ 514 515 numTasks = len(self.runtaskentries) 516 weight = {} 517 deps_left = {} 518 task_done = {} 519 520 for tid in self.runtaskentries: 521 task_done[tid] = False 522 weight[tid] = 1 523 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 524 525 for tid in endpoints: 526 weight[tid] = 10 527 task_done[tid] = True 528 529 while True: 530 next_points = [] 531 for tid in endpoints: 532 for revdep in self.runtaskentries[tid].depends: 533 weight[revdep] = weight[revdep] + weight[tid] 534 deps_left[revdep] = deps_left[revdep] - 1 535 if deps_left[revdep] == 0: 536 next_points.append(revdep) 537 task_done[revdep] = True 538 endpoints = next_points 539 if len(next_points) == 0: 540 break 541 542 # Circular dependency sanity check 543 problem_tasks = [] 544 for tid in self.runtaskentries: 545 if task_done[tid] is False or deps_left[tid] != 0: 546 problem_tasks.append(tid) 547 logger.debug2("Task %s is not buildable", tid) 548 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 549 self.runtaskentries[tid].weight = weight[tid] 550 551 if problem_tasks: 552 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 553 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 554 message = message + "Identifying dependency loops (this may take a short while)...\n" 555 logger.error(message) 556 557 msgs = self.circular_depchains_handler(problem_tasks) 558 559 message = "\n" 560 for msg in msgs: 561 message = message + msg 562 bb.msg.fatal("RunQueue", message) 563 564 return weight 565 566 def prepare(self): 567 """ 568 Turn a set of taskData into a RunQueue and compute data needed 569 to optimise the execution order. 570 """ 571 572 runq_build = {} 573 recursivetasks = {} 574 recursiveitasks = {} 575 recursivetasksselfref = set() 576 577 taskData = self.taskData 578 579 found = False 580 for mc in self.taskData: 581 if len(taskData[mc].taskentries) > 0: 582 found = True 583 break 584 if not found: 585 # Nothing to do 586 return 0 587 588 self.init_progress_reporter.start() 589 self.init_progress_reporter.next_stage() 590 591 # Step A - Work out a list of tasks to run 592 # 593 # Taskdata gives us a list of possible providers for every build and run 594 # target ordered by priority. It also gives information on each of those 595 # providers. 596 # 597 # To create the actual list of tasks to execute we fix the list of 598 # providers and then resolve the dependencies into task IDs. This 599 # process is repeated for each type of dependency (tdepends, deptask, 600 # rdeptast, recrdeptask, idepends). 601 602 def add_build_dependencies(depids, tasknames, depends, mc): 603 for depname in depids: 604 # Won't be in build_targets if ASSUME_PROVIDED 605 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 606 continue 607 depdata = taskData[mc].build_targets[depname][0] 608 if depdata is None: 609 continue 610 for taskname in tasknames: 611 t = depdata + ":" + taskname 612 if t in taskData[mc].taskentries: 613 depends.add(t) 614 615 def add_runtime_dependencies(depids, tasknames, depends, mc): 616 for depname in depids: 617 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 618 continue 619 depdata = taskData[mc].run_targets[depname][0] 620 if depdata is None: 621 continue 622 for taskname in tasknames: 623 t = depdata + ":" + taskname 624 if t in taskData[mc].taskentries: 625 depends.add(t) 626 627 def add_mc_dependencies(mc, tid): 628 mcdeps = taskData[mc].get_mcdepends() 629 for dep in mcdeps: 630 mcdependency = dep.split(':') 631 pn = mcdependency[3] 632 frommc = mcdependency[1] 633 mcdep = mcdependency[2] 634 deptask = mcdependency[4] 635 if mc == frommc: 636 fn = taskData[mcdep].build_targets[pn][0] 637 newdep = '%s:%s' % (fn,deptask) 638 taskData[mc].taskentries[tid].tdepends.append(newdep) 639 640 for mc in taskData: 641 for tid in taskData[mc].taskentries: 642 643 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 644 #runtid = build_tid(mc, fn, taskname) 645 646 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 647 648 depends = set() 649 task_deps = self.dataCaches[mc].task_deps[taskfn] 650 651 self.runtaskentries[tid] = RunTaskEntry() 652 653 if fn in taskData[mc].failed_fns: 654 continue 655 656 # We add multiconfig dependencies before processing internal task deps (tdepends) 657 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 658 add_mc_dependencies(mc, tid) 659 660 # Resolve task internal dependencies 661 # 662 # e.g. addtask before X after Y 663 for t in taskData[mc].taskentries[tid].tdepends: 664 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 665 depends.add(build_tid(depmc, depfn, deptaskname)) 666 667 # Resolve 'deptask' dependencies 668 # 669 # e.g. do_sometask[deptask] = "do_someothertask" 670 # (makes sure sometask runs after someothertask of all DEPENDS) 671 if 'deptask' in task_deps and taskname in task_deps['deptask']: 672 tasknames = task_deps['deptask'][taskname].split() 673 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 674 675 # Resolve 'rdeptask' dependencies 676 # 677 # e.g. do_sometask[rdeptask] = "do_someothertask" 678 # (makes sure sometask runs after someothertask of all RDEPENDS) 679 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 680 tasknames = task_deps['rdeptask'][taskname].split() 681 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 682 683 # Resolve inter-task dependencies 684 # 685 # e.g. do_sometask[depends] = "targetname:do_someothertask" 686 # (makes sure sometask runs after targetname's someothertask) 687 idepends = taskData[mc].taskentries[tid].idepends 688 for (depname, idependtask) in idepends: 689 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 690 # Won't be in build_targets if ASSUME_PROVIDED 691 depdata = taskData[mc].build_targets[depname][0] 692 if depdata is not None: 693 t = depdata + ":" + idependtask 694 depends.add(t) 695 if t not in taskData[mc].taskentries: 696 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 697 irdepends = taskData[mc].taskentries[tid].irdepends 698 for (depname, idependtask) in irdepends: 699 if depname in taskData[mc].run_targets: 700 # Won't be in run_targets if ASSUME_PROVIDED 701 if not taskData[mc].run_targets[depname]: 702 continue 703 depdata = taskData[mc].run_targets[depname][0] 704 if depdata is not None: 705 t = depdata + ":" + idependtask 706 depends.add(t) 707 if t not in taskData[mc].taskentries: 708 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 709 710 # Resolve recursive 'recrdeptask' dependencies (Part A) 711 # 712 # e.g. do_sometask[recrdeptask] = "do_someothertask" 713 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 714 # We cover the recursive part of the dependencies below 715 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 716 tasknames = task_deps['recrdeptask'][taskname].split() 717 recursivetasks[tid] = tasknames 718 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 719 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 720 if taskname in tasknames: 721 recursivetasksselfref.add(tid) 722 723 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 724 recursiveitasks[tid] = [] 725 for t in task_deps['recideptask'][taskname].split(): 726 newdep = build_tid(mc, fn, t) 727 recursiveitasks[tid].append(newdep) 728 729 self.runtaskentries[tid].depends = depends 730 # Remove all self references 731 self.runtaskentries[tid].depends.discard(tid) 732 733 #self.dump_data() 734 735 self.init_progress_reporter.next_stage() 736 737 # Resolve recursive 'recrdeptask' dependencies (Part B) 738 # 739 # e.g. do_sometask[recrdeptask] = "do_someothertask" 740 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 741 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 742 743 # Generating/interating recursive lists of dependencies is painful and potentially slow 744 # Precompute recursive task dependencies here by: 745 # a) create a temp list of reverse dependencies (revdeps) 746 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 747 # c) combine the total list of dependencies in cumulativedeps 748 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 749 750 751 revdeps = {} 752 deps = {} 753 cumulativedeps = {} 754 for tid in self.runtaskentries: 755 deps[tid] = set(self.runtaskentries[tid].depends) 756 revdeps[tid] = set() 757 cumulativedeps[tid] = set() 758 # Generate a temp list of reverse dependencies 759 for tid in self.runtaskentries: 760 for dep in self.runtaskentries[tid].depends: 761 revdeps[dep].add(tid) 762 # Find the dependency chain endpoints 763 endpoints = set() 764 for tid in self.runtaskentries: 765 if len(deps[tid]) == 0: 766 endpoints.add(tid) 767 # Iterate the chains collating dependencies 768 while endpoints: 769 next = set() 770 for tid in endpoints: 771 for dep in revdeps[tid]: 772 cumulativedeps[dep].add(fn_from_tid(tid)) 773 cumulativedeps[dep].update(cumulativedeps[tid]) 774 if tid in deps[dep]: 775 deps[dep].remove(tid) 776 if len(deps[dep]) == 0: 777 next.add(dep) 778 endpoints = next 779 #for tid in deps: 780 # if len(deps[tid]) != 0: 781 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 782 783 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 784 # resolve these recursively until we aren't adding any further extra dependencies 785 extradeps = True 786 while extradeps: 787 extradeps = 0 788 for tid in recursivetasks: 789 tasknames = recursivetasks[tid] 790 791 totaldeps = set(self.runtaskentries[tid].depends) 792 if tid in recursiveitasks: 793 totaldeps.update(recursiveitasks[tid]) 794 for dep in recursiveitasks[tid]: 795 if dep not in self.runtaskentries: 796 continue 797 totaldeps.update(self.runtaskentries[dep].depends) 798 799 deps = set() 800 for dep in totaldeps: 801 if dep in cumulativedeps: 802 deps.update(cumulativedeps[dep]) 803 804 for t in deps: 805 for taskname in tasknames: 806 newtid = t + ":" + taskname 807 if newtid == tid: 808 continue 809 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 810 extradeps += 1 811 self.runtaskentries[tid].depends.add(newtid) 812 813 # Handle recursive tasks which depend upon other recursive tasks 814 deps = set() 815 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 816 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 817 for newtid in deps: 818 for taskname in tasknames: 819 if not newtid.endswith(":" + taskname): 820 continue 821 if newtid in self.runtaskentries: 822 extradeps += 1 823 self.runtaskentries[tid].depends.add(newtid) 824 825 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 826 827 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 828 for tid in recursivetasksselfref: 829 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 830 831 self.init_progress_reporter.next_stage() 832 833 #self.dump_data() 834 835 # Step B - Mark all active tasks 836 # 837 # Start with the tasks we were asked to run and mark all dependencies 838 # as active too. If the task is to be 'forced', clear its stamp. Once 839 # all active tasks are marked, prune the ones we don't need. 840 841 logger.verbose("Marking Active Tasks") 842 843 def mark_active(tid, depth): 844 """ 845 Mark an item as active along with its depends 846 (calls itself recursively) 847 """ 848 849 if tid in runq_build: 850 return 851 852 runq_build[tid] = 1 853 854 depends = self.runtaskentries[tid].depends 855 for depend in depends: 856 mark_active(depend, depth+1) 857 858 def invalidate_task(tid, error_nostamp): 859 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 860 taskdep = self.dataCaches[mc].task_deps[taskfn] 861 if fn + ":" + taskname not in taskData[mc].taskentries: 862 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 863 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 864 if error_nostamp: 865 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 866 else: 867 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 868 else: 869 logger.verbose("Invalidate task %s, %s", taskname, fn) 870 bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn) 871 872 self.target_tids = [] 873 for (mc, target, task, fn) in self.targets: 874 875 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 876 continue 877 878 if target in taskData[mc].failed_deps: 879 continue 880 881 parents = False 882 if task.endswith('-'): 883 parents = True 884 task = task[:-1] 885 886 if fn in taskData[mc].failed_fns: 887 continue 888 889 # fn already has mc prefix 890 tid = fn + ":" + task 891 self.target_tids.append(tid) 892 if tid not in taskData[mc].taskentries: 893 import difflib 894 tasks = [] 895 for x in taskData[mc].taskentries: 896 if x.startswith(fn + ":"): 897 tasks.append(taskname_from_tid(x)) 898 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 899 if close_matches: 900 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 901 else: 902 extra = "" 903 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 904 905 # For tasks called "XXXX-", ony run their dependencies 906 if parents: 907 for i in self.runtaskentries[tid].depends: 908 mark_active(i, 1) 909 else: 910 mark_active(tid, 1) 911 912 self.init_progress_reporter.next_stage() 913 914 # Step C - Prune all inactive tasks 915 # 916 # Once all active tasks are marked, prune the ones we don't need. 917 918 delcount = {} 919 for tid in list(self.runtaskentries.keys()): 920 if tid not in runq_build: 921 delcount[tid] = self.runtaskentries[tid] 922 del self.runtaskentries[tid] 923 924 # Handle --runall 925 if self.cooker.configuration.runall: 926 # re-run the mark_active and then drop unused tasks from new list 927 runq_build = {} 928 929 for task in self.cooker.configuration.runall: 930 if not task.startswith("do_"): 931 task = "do_{0}".format(task) 932 runall_tids = set() 933 for tid in list(self.runtaskentries): 934 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 935 if wanttid in delcount: 936 self.runtaskentries[wanttid] = delcount[wanttid] 937 if wanttid in self.runtaskentries: 938 runall_tids.add(wanttid) 939 940 for tid in list(runall_tids): 941 mark_active(tid,1) 942 if self.cooker.configuration.force: 943 invalidate_task(tid, False) 944 945 for tid in list(self.runtaskentries.keys()): 946 if tid not in runq_build: 947 delcount[tid] = self.runtaskentries[tid] 948 del self.runtaskentries[tid] 949 950 if len(self.runtaskentries) == 0: 951 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 952 953 self.init_progress_reporter.next_stage() 954 955 # Handle runonly 956 if self.cooker.configuration.runonly: 957 # re-run the mark_active and then drop unused tasks from new list 958 runq_build = {} 959 960 for task in self.cooker.configuration.runonly: 961 if not task.startswith("do_"): 962 task = "do_{0}".format(task) 963 runonly_tids = { k: v for k, v in self.runtaskentries.items() if taskname_from_tid(k) == task } 964 965 for tid in list(runonly_tids): 966 mark_active(tid,1) 967 if self.cooker.configuration.force: 968 invalidate_task(tid, False) 969 970 for tid in list(self.runtaskentries.keys()): 971 if tid not in runq_build: 972 delcount[tid] = self.runtaskentries[tid] 973 del self.runtaskentries[tid] 974 975 if len(self.runtaskentries) == 0: 976 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 977 978 # 979 # Step D - Sanity checks and computation 980 # 981 982 # Check to make sure we still have tasks to run 983 if len(self.runtaskentries) == 0: 984 if not taskData[''].abort: 985 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 986 else: 987 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 988 989 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 990 991 logger.verbose("Assign Weightings") 992 993 self.init_progress_reporter.next_stage() 994 995 # Generate a list of reverse dependencies to ease future calculations 996 for tid in self.runtaskentries: 997 for dep in self.runtaskentries[tid].depends: 998 self.runtaskentries[dep].revdeps.add(tid) 999 1000 self.init_progress_reporter.next_stage() 1001 1002 # Identify tasks at the end of dependency chains 1003 # Error on circular dependency loops (length two) 1004 endpoints = [] 1005 for tid in self.runtaskentries: 1006 revdeps = self.runtaskentries[tid].revdeps 1007 if len(revdeps) == 0: 1008 endpoints.append(tid) 1009 for dep in revdeps: 1010 if dep in self.runtaskentries[tid].depends: 1011 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1012 1013 1014 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1015 1016 self.init_progress_reporter.next_stage() 1017 1018 # Calculate task weights 1019 # Check of higher length circular dependencies 1020 self.runq_weight = self.calculate_task_weights(endpoints) 1021 1022 self.init_progress_reporter.next_stage() 1023 1024 # Sanity Check - Check for multiple tasks building the same provider 1025 for mc in self.dataCaches: 1026 prov_list = {} 1027 seen_fn = [] 1028 for tid in self.runtaskentries: 1029 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1030 if taskfn in seen_fn: 1031 continue 1032 if mc != tidmc: 1033 continue 1034 seen_fn.append(taskfn) 1035 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1036 if prov not in prov_list: 1037 prov_list[prov] = [taskfn] 1038 elif taskfn not in prov_list[prov]: 1039 prov_list[prov].append(taskfn) 1040 for prov in prov_list: 1041 if len(prov_list[prov]) < 2: 1042 continue 1043 if prov in self.multi_provider_whitelist: 1044 continue 1045 seen_pn = [] 1046 # If two versions of the same PN are being built its fatal, we don't support it. 1047 for fn in prov_list[prov]: 1048 pn = self.dataCaches[mc].pkg_fn[fn] 1049 if pn not in seen_pn: 1050 seen_pn.append(pn) 1051 else: 1052 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1053 msg = "Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov])) 1054 # 1055 # Construct a list of things which uniquely depend on each provider 1056 # since this may help the user figure out which dependency is triggering this warning 1057 # 1058 msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from." 1059 deplist = {} 1060 commondeps = None 1061 for provfn in prov_list[prov]: 1062 deps = set() 1063 for tid in self.runtaskentries: 1064 fn = fn_from_tid(tid) 1065 if fn != provfn: 1066 continue 1067 for dep in self.runtaskentries[tid].revdeps: 1068 fn = fn_from_tid(dep) 1069 if fn == provfn: 1070 continue 1071 deps.add(dep) 1072 if not commondeps: 1073 commondeps = set(deps) 1074 else: 1075 commondeps &= deps 1076 deplist[provfn] = deps 1077 for provfn in deplist: 1078 msg += "\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps)) 1079 # 1080 # Construct a list of provides and runtime providers for each recipe 1081 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1082 # 1083 msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful." 1084 provide_results = {} 1085 rprovide_results = {} 1086 commonprovs = None 1087 commonrprovs = None 1088 for provfn in prov_list[prov]: 1089 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1090 rprovides = set() 1091 for rprovide in self.dataCaches[mc].rproviders: 1092 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1093 rprovides.add(rprovide) 1094 for package in self.dataCaches[mc].packages: 1095 if provfn in self.dataCaches[mc].packages[package]: 1096 rprovides.add(package) 1097 for package in self.dataCaches[mc].packages_dynamic: 1098 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1099 rprovides.add(package) 1100 if not commonprovs: 1101 commonprovs = set(provides) 1102 else: 1103 commonprovs &= provides 1104 provide_results[provfn] = provides 1105 if not commonrprovs: 1106 commonrprovs = set(rprovides) 1107 else: 1108 commonrprovs &= rprovides 1109 rprovide_results[provfn] = rprovides 1110 #msg += "\nCommon provides:\n %s" % ("\n ".join(commonprovs)) 1111 #msg += "\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs)) 1112 for provfn in prov_list[prov]: 1113 msg += "\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs)) 1114 msg += "\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs)) 1115 1116 if self.warn_multi_bb: 1117 logger.verbnote(msg) 1118 else: 1119 logger.error(msg) 1120 1121 self.init_progress_reporter.next_stage() 1122 1123 # Create a whitelist usable by the stamp checks 1124 self.stampfnwhitelist = {} 1125 for mc in self.taskData: 1126 self.stampfnwhitelist[mc] = [] 1127 for entry in self.stampwhitelist.split(): 1128 if entry not in self.taskData[mc].build_targets: 1129 continue 1130 fn = self.taskData.build_targets[entry][0] 1131 self.stampfnwhitelist[mc].append(fn) 1132 1133 self.init_progress_reporter.next_stage() 1134 1135 # Iterate over the task list looking for tasks with a 'setscene' function 1136 self.runq_setscene_tids = set() 1137 if not self.cooker.configuration.nosetscene: 1138 for tid in self.runtaskentries: 1139 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1140 setscenetid = tid + "_setscene" 1141 if setscenetid not in taskData[mc].taskentries: 1142 continue 1143 self.runq_setscene_tids.add(tid) 1144 1145 self.init_progress_reporter.next_stage() 1146 1147 # Invalidate task if force mode active 1148 if self.cooker.configuration.force: 1149 for tid in self.target_tids: 1150 invalidate_task(tid, False) 1151 1152 # Invalidate task if invalidate mode active 1153 if self.cooker.configuration.invalidate_stamp: 1154 for tid in self.target_tids: 1155 fn = fn_from_tid(tid) 1156 for st in self.cooker.configuration.invalidate_stamp.split(','): 1157 if not st.startswith("do_"): 1158 st = "do_%s" % st 1159 invalidate_task(fn + ":" + st, True) 1160 1161 self.init_progress_reporter.next_stage() 1162 1163 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1164 for mc in taskData: 1165 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1166 virtpnmap = {} 1167 for v in virtmap: 1168 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1169 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1170 if hasattr(bb.parse.siggen, "tasks_resolved"): 1171 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1172 1173 self.init_progress_reporter.next_stage() 1174 1175 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1176 1177 # Iterate over the task list and call into the siggen code 1178 dealtwith = set() 1179 todeal = set(self.runtaskentries) 1180 while len(todeal) > 0: 1181 for tid in todeal.copy(): 1182 if len(self.runtaskentries[tid].depends - dealtwith) == 0: 1183 dealtwith.add(tid) 1184 todeal.remove(tid) 1185 self.prepare_task_hash(tid) 1186 1187 bb.parse.siggen.writeout_file_checksum_cache() 1188 1189 #self.dump_data() 1190 return len(self.runtaskentries) 1191 1192 def prepare_task_hash(self, tid): 1193 dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid)) 1194 bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc) 1195 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc) 1196 self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 1197 1198 def dump_data(self): 1199 """ 1200 Dump some debug information on the internal data structures 1201 """ 1202 logger.debug3("run_tasks:") 1203 for tid in self.runtaskentries: 1204 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1205 self.runtaskentries[tid].weight, 1206 self.runtaskentries[tid].depends, 1207 self.runtaskentries[tid].revdeps) 1208 1209class RunQueueWorker(): 1210 def __init__(self, process, pipe): 1211 self.process = process 1212 self.pipe = pipe 1213 1214class RunQueue: 1215 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1216 1217 self.cooker = cooker 1218 self.cfgData = cfgData 1219 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1220 1221 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY") or "perfile" 1222 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1223 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1224 1225 self.state = runQueuePrepare 1226 1227 # For disk space monitor 1228 # Invoked at regular time intervals via the bitbake heartbeat event 1229 # while the build is running. We generate a unique name for the handler 1230 # here, just in case that there ever is more than one RunQueue instance, 1231 # start the handler when reaching runQueueSceneInit, and stop it when 1232 # done with the build. 1233 self.dm = monitordisk.diskMonitor(cfgData) 1234 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1235 self.dm_event_handler_registered = False 1236 self.rqexe = None 1237 self.worker = {} 1238 self.fakeworker = {} 1239 1240 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1241 logger.debug("Starting bitbake-worker") 1242 magic = "decafbad" 1243 if self.cooker.configuration.profile: 1244 magic = "decafbadbad" 1245 fakerootlogs = None 1246 if fakeroot: 1247 magic = magic + "beef" 1248 mcdata = self.cooker.databuilder.mcdata[mc] 1249 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1250 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1251 env = os.environ.copy() 1252 for key, value in (var.split('=') for var in fakerootenv): 1253 env[key] = value 1254 worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1255 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1256 else: 1257 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1258 bb.utils.nonblockingfd(worker.stdout) 1259 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1260 1261 workerdata = { 1262 "taskdeps" : self.rqdata.dataCaches[mc].task_deps, 1263 "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv, 1264 "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs, 1265 "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, 1266 "sigdata" : bb.parse.siggen.get_taskdata(), 1267 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1268 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1269 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1270 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1271 "prhost" : self.cooker.prhost, 1272 "buildname" : self.cfgData.getVar("BUILDNAME"), 1273 "date" : self.cfgData.getVar("DATE"), 1274 "time" : self.cfgData.getVar("TIME"), 1275 "hashservaddr" : self.cooker.hashservaddr, 1276 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1277 } 1278 1279 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") 1280 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") 1281 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 1282 worker.stdin.flush() 1283 1284 return RunQueueWorker(worker, workerpipe) 1285 1286 def _teardown_worker(self, worker): 1287 if not worker: 1288 return 1289 logger.debug("Teardown for bitbake-worker") 1290 try: 1291 worker.process.stdin.write(b"<quit></quit>") 1292 worker.process.stdin.flush() 1293 worker.process.stdin.close() 1294 except IOError: 1295 pass 1296 while worker.process.returncode is None: 1297 worker.pipe.read() 1298 worker.process.poll() 1299 while worker.pipe.read(): 1300 continue 1301 worker.pipe.close() 1302 1303 def start_worker(self): 1304 if self.worker: 1305 self.teardown_workers() 1306 self.teardown = False 1307 for mc in self.rqdata.dataCaches: 1308 self.worker[mc] = self._start_worker(mc) 1309 1310 def start_fakeworker(self, rqexec, mc): 1311 if not mc in self.fakeworker: 1312 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1313 1314 def teardown_workers(self): 1315 self.teardown = True 1316 for mc in self.worker: 1317 self._teardown_worker(self.worker[mc]) 1318 self.worker = {} 1319 for mc in self.fakeworker: 1320 self._teardown_worker(self.fakeworker[mc]) 1321 self.fakeworker = {} 1322 1323 def read_workers(self): 1324 for mc in self.worker: 1325 self.worker[mc].pipe.read() 1326 for mc in self.fakeworker: 1327 self.fakeworker[mc].pipe.read() 1328 1329 def active_fds(self): 1330 fds = [] 1331 for mc in self.worker: 1332 fds.append(self.worker[mc].pipe.input) 1333 for mc in self.fakeworker: 1334 fds.append(self.fakeworker[mc].pipe.input) 1335 return fds 1336 1337 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1338 def get_timestamp(f): 1339 try: 1340 if not os.access(f, os.F_OK): 1341 return None 1342 return os.stat(f)[stat.ST_MTIME] 1343 except: 1344 return None 1345 1346 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1347 if taskname is None: 1348 taskname = tn 1349 1350 if self.stamppolicy == "perfile": 1351 fulldeptree = False 1352 else: 1353 fulldeptree = True 1354 stampwhitelist = [] 1355 if self.stamppolicy == "whitelist": 1356 stampwhitelist = self.rqdata.stampfnwhitelist[mc] 1357 1358 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn) 1359 1360 # If the stamp is missing, it's not current 1361 if not os.access(stampfile, os.F_OK): 1362 logger.debug2("Stampfile %s not available", stampfile) 1363 return False 1364 # If it's a 'nostamp' task, it's not current 1365 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1366 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1367 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1368 return False 1369 1370 if taskname != "do_setscene" and taskname.endswith("_setscene"): 1371 return True 1372 1373 if cache is None: 1374 cache = {} 1375 1376 iscurrent = True 1377 t1 = get_timestamp(stampfile) 1378 for dep in self.rqdata.runtaskentries[tid].depends: 1379 if iscurrent: 1380 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1381 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2) 1382 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2) 1383 t2 = get_timestamp(stampfile2) 1384 t3 = get_timestamp(stampfile3) 1385 if t3 and not t2: 1386 continue 1387 if t3 and t3 > t2: 1388 continue 1389 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): 1390 if not t2: 1391 logger.debug2('Stampfile %s does not exist', stampfile2) 1392 iscurrent = False 1393 break 1394 if t1 < t2: 1395 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1396 iscurrent = False 1397 break 1398 if recurse and iscurrent: 1399 if dep in cache: 1400 iscurrent = cache[dep] 1401 if not iscurrent: 1402 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1403 else: 1404 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1405 cache[dep] = iscurrent 1406 if recurse: 1407 cache[tid] = iscurrent 1408 return iscurrent 1409 1410 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1411 valid = set() 1412 if self.hashvalidate: 1413 sq_data = {} 1414 sq_data['hash'] = {} 1415 sq_data['hashfn'] = {} 1416 sq_data['unihash'] = {} 1417 for tid in tocheck: 1418 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1419 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1420 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1421 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1422 1423 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1424 1425 return valid 1426 1427 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1428 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1429 1430 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1431 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1432 1433 return bb.utils.better_eval(call, locs) 1434 1435 def _execute_runqueue(self): 1436 """ 1437 Run the tasks in a queue prepared by rqdata.prepare() 1438 Upon failure, optionally try to recover the build using any alternate providers 1439 (if the abort on failure configuration option isn't set) 1440 """ 1441 1442 retval = True 1443 1444 if self.state is runQueuePrepare: 1445 # NOTE: if you add, remove or significantly refactor the stages of this 1446 # process then you should recalculate the weightings here. This is quite 1447 # easy to do - just change the next line temporarily to pass debug=True as 1448 # the last parameter and you'll get a printout of the weightings as well 1449 # as a map to the lines where next_stage() was called. Of course this isn't 1450 # critical, but it helps to keep the progress reporting accurate. 1451 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1452 "Initialising tasks", 1453 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1454 if self.rqdata.prepare() == 0: 1455 self.state = runQueueComplete 1456 else: 1457 self.state = runQueueSceneInit 1458 bb.parse.siggen.save_unitaskhashes() 1459 1460 if self.state is runQueueSceneInit: 1461 self.rqdata.init_progress_reporter.next_stage() 1462 1463 # we are ready to run, emit dependency info to any UI or class which 1464 # needs it 1465 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1466 self.rqdata.init_progress_reporter.next_stage() 1467 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1468 1469 if not self.dm_event_handler_registered: 1470 res = bb.event.register(self.dm_event_handler_name, 1471 lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1472 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1473 self.dm_event_handler_registered = True 1474 1475 dump = self.cooker.configuration.dump_signatures 1476 if dump: 1477 self.rqdata.init_progress_reporter.finish() 1478 if 'printdiff' in dump: 1479 invalidtasks = self.print_diffscenetasks() 1480 self.dump_signatures(dump) 1481 if 'printdiff' in dump: 1482 self.write_diffscenetasks(invalidtasks) 1483 self.state = runQueueComplete 1484 1485 if self.state is runQueueSceneInit: 1486 self.rqdata.init_progress_reporter.next_stage() 1487 self.start_worker() 1488 self.rqdata.init_progress_reporter.next_stage() 1489 self.rqexe = RunQueueExecute(self) 1490 1491 # If we don't have any setscene functions, skip execution 1492 if len(self.rqdata.runq_setscene_tids) == 0: 1493 logger.info('No setscene tasks') 1494 for tid in self.rqdata.runtaskentries: 1495 if len(self.rqdata.runtaskentries[tid].depends) == 0: 1496 self.rqexe.setbuildable(tid) 1497 self.rqexe.tasks_notcovered.add(tid) 1498 self.rqexe.sqdone = True 1499 logger.info('Executing Tasks') 1500 self.state = runQueueRunning 1501 1502 if self.state is runQueueRunning: 1503 retval = self.rqexe.execute() 1504 1505 if self.state is runQueueCleanUp: 1506 retval = self.rqexe.finish() 1507 1508 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1509 1510 if build_done and self.dm_event_handler_registered: 1511 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1512 self.dm_event_handler_registered = False 1513 1514 if build_done and self.rqexe: 1515 bb.parse.siggen.save_unitaskhashes() 1516 self.teardown_workers() 1517 if self.rqexe: 1518 if self.rqexe.stats.failed: 1519 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1520 else: 1521 # Let's avoid the word "failed" if nothing actually did 1522 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1523 1524 if self.state is runQueueFailed: 1525 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1526 1527 if self.state is runQueueComplete: 1528 # All done 1529 return False 1530 1531 # Loop 1532 return retval 1533 1534 def execute_runqueue(self): 1535 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1536 try: 1537 return self._execute_runqueue() 1538 except bb.runqueue.TaskFailure: 1539 raise 1540 except SystemExit: 1541 raise 1542 except bb.BBHandledException: 1543 try: 1544 self.teardown_workers() 1545 except: 1546 pass 1547 self.state = runQueueComplete 1548 raise 1549 except Exception as err: 1550 logger.exception("An uncaught exception occurred in runqueue") 1551 try: 1552 self.teardown_workers() 1553 except: 1554 pass 1555 self.state = runQueueComplete 1556 raise 1557 1558 def finish_runqueue(self, now = False): 1559 if not self.rqexe: 1560 self.state = runQueueComplete 1561 return 1562 1563 if now: 1564 self.rqexe.finish_now() 1565 else: 1566 self.rqexe.finish() 1567 1568 def rq_dump_sigfn(self, fn, options): 1569 bb_cache = bb.cache.NoCache(self.cooker.databuilder) 1570 mc = bb.runqueue.mc_from_tid(fn) 1571 the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn)) 1572 siggen = bb.parse.siggen 1573 dataCaches = self.rqdata.dataCaches 1574 siggen.dump_sigfn(fn, dataCaches, options) 1575 1576 def dump_signatures(self, options): 1577 fns = set() 1578 bb.note("Reparsing files to collect dependency data") 1579 1580 for tid in self.rqdata.runtaskentries: 1581 fn = fn_from_tid(tid) 1582 fns.add(fn) 1583 1584 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1585 # We cannot use the real multiprocessing.Pool easily due to some local data 1586 # that can't be pickled. This is a cheap multi-process solution. 1587 launched = [] 1588 while fns: 1589 if len(launched) < max_process: 1590 p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options)) 1591 p.start() 1592 launched.append(p) 1593 for q in launched: 1594 # The finished processes are joined when calling is_alive() 1595 if not q.is_alive(): 1596 launched.remove(q) 1597 for p in launched: 1598 p.join() 1599 1600 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1601 1602 return 1603 1604 def print_diffscenetasks(self): 1605 1606 noexec = [] 1607 tocheck = set() 1608 1609 for tid in self.rqdata.runtaskentries: 1610 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1611 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1612 1613 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1614 noexec.append(tid) 1615 continue 1616 1617 tocheck.add(tid) 1618 1619 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1620 1621 # Tasks which are both setscene and noexec never care about dependencies 1622 # We therefore find tasks which are setscene and noexec and mark their 1623 # unique dependencies as valid. 1624 for tid in noexec: 1625 if tid not in self.rqdata.runq_setscene_tids: 1626 continue 1627 for dep in self.rqdata.runtaskentries[tid].depends: 1628 hasnoexecparents = True 1629 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1630 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1631 continue 1632 hasnoexecparents = False 1633 break 1634 if hasnoexecparents: 1635 valid_new.add(dep) 1636 1637 invalidtasks = set() 1638 for tid in self.rqdata.runtaskentries: 1639 if tid not in valid_new and tid not in noexec: 1640 invalidtasks.add(tid) 1641 1642 found = set() 1643 processed = set() 1644 for tid in invalidtasks: 1645 toprocess = set([tid]) 1646 while toprocess: 1647 next = set() 1648 for t in toprocess: 1649 for dep in self.rqdata.runtaskentries[t].depends: 1650 if dep in invalidtasks: 1651 found.add(tid) 1652 if dep not in processed: 1653 processed.add(dep) 1654 next.add(dep) 1655 toprocess = next 1656 if tid in found: 1657 toprocess = set() 1658 1659 tasklist = [] 1660 for tid in invalidtasks.difference(found): 1661 tasklist.append(tid) 1662 1663 if tasklist: 1664 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1665 1666 return invalidtasks.difference(found) 1667 1668 def write_diffscenetasks(self, invalidtasks): 1669 1670 # Define recursion callback 1671 def recursecb(key, hash1, hash2): 1672 hashes = [hash1, hash2] 1673 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1674 1675 recout = [] 1676 if len(hashfiles) == 2: 1677 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) 1678 recout.extend(list(' ' + l for l in out2)) 1679 else: 1680 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1681 1682 return recout 1683 1684 1685 for tid in invalidtasks: 1686 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1687 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1688 h = self.rqdata.runtaskentries[tid].hash 1689 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData) 1690 match = None 1691 for m in matches: 1692 if h in m: 1693 match = m 1694 if match is None: 1695 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) 1696 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1697 if matches: 1698 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] 1699 prevh = __find_sha256__.search(latestmatch).group(0) 1700 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1701 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1702 1703 1704class RunQueueExecute: 1705 1706 def __init__(self, rq): 1707 self.rq = rq 1708 self.cooker = rq.cooker 1709 self.cfgData = rq.cfgData 1710 self.rqdata = rq.rqdata 1711 1712 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1713 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1714 1715 self.sq_buildable = set() 1716 self.sq_running = set() 1717 self.sq_live = set() 1718 1719 self.updated_taskhash_queue = [] 1720 self.pending_migrations = set() 1721 1722 self.runq_buildable = set() 1723 self.runq_running = set() 1724 self.runq_complete = set() 1725 self.runq_tasksrun = set() 1726 1727 self.build_stamps = {} 1728 self.build_stamps2 = [] 1729 self.failed_tids = [] 1730 self.sq_deferred = {} 1731 1732 self.stampcache = {} 1733 1734 self.holdoff_tasks = set() 1735 self.holdoff_need_update = True 1736 self.sqdone = False 1737 1738 self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) 1739 self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids)) 1740 1741 for mc in rq.worker: 1742 rq.worker[mc].pipe.setrunqueueexec(self) 1743 for mc in rq.fakeworker: 1744 rq.fakeworker[mc].pipe.setrunqueueexec(self) 1745 1746 if self.number_tasks <= 0: 1747 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1748 1749 # List of setscene tasks which we've covered 1750 self.scenequeue_covered = set() 1751 # List of tasks which are covered (including setscene ones) 1752 self.tasks_covered = set() 1753 self.tasks_scenequeue_done = set() 1754 self.scenequeue_notcovered = set() 1755 self.tasks_notcovered = set() 1756 self.scenequeue_notneeded = set() 1757 1758 # We can't skip specified target tasks which aren't setscene tasks 1759 self.cantskip = set(self.rqdata.target_tids) 1760 self.cantskip.difference_update(self.rqdata.runq_setscene_tids) 1761 self.cantskip.intersection_update(self.rqdata.runtaskentries) 1762 1763 schedulers = self.get_schedulers() 1764 for scheduler in schedulers: 1765 if self.scheduler == scheduler.name: 1766 self.sched = scheduler(self, self.rqdata) 1767 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1768 break 1769 else: 1770 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1771 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1772 1773 #if len(self.rqdata.runq_setscene_tids) > 0: 1774 self.sqdata = SQData() 1775 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) 1776 1777 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1778 1779 # self.build_stamps[pid] may not exist when use shared work directory. 1780 if task in self.build_stamps: 1781 self.build_stamps2.remove(self.build_stamps[task]) 1782 del self.build_stamps[task] 1783 1784 if task in self.sq_live: 1785 if status != 0: 1786 self.sq_task_fail(task, status) 1787 else: 1788 self.sq_task_complete(task) 1789 self.sq_live.remove(task) 1790 else: 1791 if status != 0: 1792 self.task_fail(task, status, fakerootlog=fakerootlog) 1793 else: 1794 self.task_complete(task) 1795 return True 1796 1797 def finish_now(self): 1798 for mc in self.rq.worker: 1799 try: 1800 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") 1801 self.rq.worker[mc].process.stdin.flush() 1802 except IOError: 1803 # worker must have died? 1804 pass 1805 for mc in self.rq.fakeworker: 1806 try: 1807 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") 1808 self.rq.fakeworker[mc].process.stdin.flush() 1809 except IOError: 1810 # worker must have died? 1811 pass 1812 1813 if len(self.failed_tids) != 0: 1814 self.rq.state = runQueueFailed 1815 return 1816 1817 self.rq.state = runQueueComplete 1818 return 1819 1820 def finish(self): 1821 self.rq.state = runQueueCleanUp 1822 1823 active = self.stats.active + self.sq_stats.active 1824 if active > 0: 1825 bb.event.fire(runQueueExitWait(active), self.cfgData) 1826 self.rq.read_workers() 1827 return self.rq.active_fds() 1828 1829 if len(self.failed_tids) != 0: 1830 self.rq.state = runQueueFailed 1831 return True 1832 1833 self.rq.state = runQueueComplete 1834 return True 1835 1836 # Used by setscene only 1837 def check_dependencies(self, task, taskdeps): 1838 if not self.rq.depvalidate: 1839 return False 1840 1841 # Must not edit parent data 1842 taskdeps = set(taskdeps) 1843 1844 taskdata = {} 1845 taskdeps.add(task) 1846 for dep in taskdeps: 1847 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 1848 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1849 taskdata[dep] = [pn, taskname, fn] 1850 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 1851 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 1852 valid = bb.utils.better_eval(call, locs) 1853 return valid 1854 1855 def can_start_task(self): 1856 active = self.stats.active + self.sq_stats.active 1857 can_start = active < self.number_tasks 1858 return can_start 1859 1860 def get_schedulers(self): 1861 schedulers = set(obj for obj in globals().values() 1862 if type(obj) is type and 1863 issubclass(obj, RunQueueScheduler)) 1864 1865 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 1866 if user_schedulers: 1867 for sched in user_schedulers.split(): 1868 if not "." in sched: 1869 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 1870 continue 1871 1872 modname, name = sched.rsplit(".", 1) 1873 try: 1874 module = __import__(modname, fromlist=(name,)) 1875 except ImportError as exc: 1876 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 1877 raise SystemExit(1) 1878 else: 1879 schedulers.add(getattr(module, name)) 1880 return schedulers 1881 1882 def setbuildable(self, task): 1883 self.runq_buildable.add(task) 1884 self.sched.newbuildable(task) 1885 1886 def task_completeoutright(self, task): 1887 """ 1888 Mark a task as completed 1889 Look at the reverse dependencies and mark any task with 1890 completed dependencies as buildable 1891 """ 1892 self.runq_complete.add(task) 1893 for revdep in self.rqdata.runtaskentries[task].revdeps: 1894 if revdep in self.runq_running: 1895 continue 1896 if revdep in self.runq_buildable: 1897 continue 1898 alldeps = True 1899 for dep in self.rqdata.runtaskentries[revdep].depends: 1900 if dep not in self.runq_complete: 1901 alldeps = False 1902 break 1903 if alldeps: 1904 self.setbuildable(revdep) 1905 logger.debug("Marking task %s as buildable", revdep) 1906 1907 def task_complete(self, task): 1908 self.stats.taskCompleted() 1909 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 1910 self.task_completeoutright(task) 1911 self.runq_tasksrun.add(task) 1912 1913 def task_fail(self, task, exitcode, fakerootlog=None): 1914 """ 1915 Called when a task has failed 1916 Updates the state engine with the failure 1917 """ 1918 self.stats.taskFailed() 1919 self.failed_tids.append(task) 1920 1921 fakeroot_log = "" 1922 if fakerootlog and os.path.exists(fakerootlog): 1923 with open(fakerootlog) as fakeroot_log_file: 1924 fakeroot_failed = False 1925 for line in reversed(fakeroot_log_file.readlines()): 1926 for fakeroot_error in ['mismatch', 'error', 'fatal']: 1927 if fakeroot_error in line.lower(): 1928 fakeroot_failed = True 1929 if 'doing new pid setup and server start' in line: 1930 break 1931 fakeroot_log = line + fakeroot_log 1932 1933 if not fakeroot_failed: 1934 fakeroot_log = None 1935 1936 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=fakeroot_log), self.cfgData) 1937 1938 if self.rqdata.taskData[''].abort: 1939 self.rq.state = runQueueCleanUp 1940 1941 def task_skip(self, task, reason): 1942 self.runq_running.add(task) 1943 self.setbuildable(task) 1944 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 1945 self.task_completeoutright(task) 1946 self.stats.taskSkipped() 1947 self.stats.taskCompleted() 1948 1949 def summarise_scenequeue_errors(self): 1950 err = False 1951 if not self.sqdone: 1952 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 1953 completeevent = sceneQueueComplete(self.sq_stats, self.rq) 1954 bb.event.fire(completeevent, self.cfgData) 1955 if self.sq_deferred: 1956 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 1957 err = True 1958 if self.updated_taskhash_queue: 1959 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 1960 err = True 1961 if self.holdoff_tasks: 1962 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 1963 err = True 1964 1965 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 1966 # No task should end up in both covered and uncovered, that is a bug. 1967 logger.error("Setscene task %s in both covered and notcovered." % tid) 1968 1969 for tid in self.rqdata.runq_setscene_tids: 1970 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 1971 err = True 1972 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 1973 if tid not in self.sq_buildable: 1974 err = True 1975 logger.error("Setscene Task %s was never marked as buildable" % tid) 1976 if tid not in self.sq_running: 1977 err = True 1978 logger.error("Setscene Task %s was never marked as running" % tid) 1979 1980 for x in self.rqdata.runtaskentries: 1981 if x not in self.tasks_covered and x not in self.tasks_notcovered: 1982 logger.error("Task %s was never moved from the setscene queue" % x) 1983 err = True 1984 if x not in self.tasks_scenequeue_done: 1985 logger.error("Task %s was never processed by the setscene code" % x) 1986 err = True 1987 if len(self.rqdata.runtaskentries[x].depends) == 0 and x not in self.runq_buildable: 1988 logger.error("Task %s was never marked as buildable by the setscene code" % x) 1989 err = True 1990 return err 1991 1992 1993 def execute(self): 1994 """ 1995 Run the tasks in a queue prepared by prepare_runqueue 1996 """ 1997 1998 self.rq.read_workers() 1999 if self.updated_taskhash_queue or self.pending_migrations: 2000 self.process_possible_migrations() 2001 2002 if not hasattr(self, "sorted_setscene_tids"): 2003 # Don't want to sort this set every execution 2004 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 2005 2006 task = None 2007 if not self.sqdone and self.can_start_task(): 2008 # Find the next setscene to run 2009 for nexttask in self.sorted_setscene_tids: 2010 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): 2011 if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2012 if nexttask not in self.rqdata.target_tids: 2013 logger.debug2("Skipping setscene for task %s" % nexttask) 2014 self.sq_task_skip(nexttask) 2015 self.scenequeue_notneeded.add(nexttask) 2016 if nexttask in self.sq_deferred: 2017 del self.sq_deferred[nexttask] 2018 return True 2019 # If covered tasks are running, need to wait for them to complete 2020 for t in self.sqdata.sq_covered_tasks[nexttask]: 2021 if t in self.runq_running and t not in self.runq_complete: 2022 continue 2023 if nexttask in self.sq_deferred: 2024 if self.sq_deferred[nexttask] not in self.runq_complete: 2025 continue 2026 logger.debug("Task %s no longer deferred" % nexttask) 2027 del self.sq_deferred[nexttask] 2028 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2029 if not valid: 2030 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2031 self.sq_task_failoutright(nexttask) 2032 return True 2033 if nexttask in self.sqdata.outrightfail: 2034 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2035 self.sq_task_failoutright(nexttask) 2036 return True 2037 if nexttask in self.sqdata.unskippable: 2038 logger.debug2("Setscene task %s is unskippable" % nexttask) 2039 task = nexttask 2040 break 2041 if task is not None: 2042 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2043 taskname = taskname + "_setscene" 2044 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2045 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2046 self.sq_task_failoutright(task) 2047 return True 2048 2049 if self.cooker.configuration.force: 2050 if task in self.rqdata.target_tids: 2051 self.sq_task_failoutright(task) 2052 return True 2053 2054 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2055 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2056 self.sq_task_skip(task) 2057 return True 2058 2059 if self.cooker.configuration.skipsetscene: 2060 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2061 self.sq_task_failoutright(task) 2062 return True 2063 2064 startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq) 2065 bb.event.fire(startevent, self.cfgData) 2066 2067 taskdepdata = self.sq_build_taskdepdata(task) 2068 2069 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2070 taskhash = self.rqdata.get_task_hash(task) 2071 unihash = self.rqdata.get_task_unihash(task) 2072 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2073 if not mc in self.rq.fakeworker: 2074 self.rq.start_fakeworker(self, mc) 2075 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2076 self.rq.fakeworker[mc].process.stdin.flush() 2077 else: 2078 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") 2079 self.rq.worker[mc].process.stdin.flush() 2080 2081 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2082 self.build_stamps2.append(self.build_stamps[task]) 2083 self.sq_running.add(task) 2084 self.sq_live.add(task) 2085 self.sq_stats.taskActive() 2086 if self.can_start_task(): 2087 return True 2088 2089 self.update_holdofftasks() 2090 2091 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2092 hashequiv_logger.verbose("Setscene tasks completed") 2093 2094 err = self.summarise_scenequeue_errors() 2095 if err: 2096 self.rq.state = runQueueFailed 2097 return True 2098 2099 if self.cooker.configuration.setsceneonly: 2100 self.rq.state = runQueueComplete 2101 return True 2102 self.sqdone = True 2103 2104 if self.stats.total == 0: 2105 # nothing to do 2106 self.rq.state = runQueueComplete 2107 return True 2108 2109 if self.cooker.configuration.setsceneonly: 2110 task = None 2111 else: 2112 task = self.sched.next() 2113 if task is not None: 2114 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2115 2116 if self.rqdata.setscenewhitelist is not None: 2117 if self.check_setscenewhitelist(task): 2118 self.task_fail(task, "setscene whitelist") 2119 return True 2120 2121 if task in self.tasks_covered: 2122 logger.debug2("Setscene covered task %s", task) 2123 self.task_skip(task, "covered") 2124 return True 2125 2126 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2127 logger.debug2("Stamp current task %s", task) 2128 2129 self.task_skip(task, "existing") 2130 self.runq_tasksrun.add(task) 2131 return True 2132 2133 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2134 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2135 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2136 noexec=True) 2137 bb.event.fire(startevent, self.cfgData) 2138 self.runq_running.add(task) 2139 self.stats.taskActive() 2140 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2141 bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn) 2142 self.task_complete(task) 2143 return True 2144 else: 2145 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2146 bb.event.fire(startevent, self.cfgData) 2147 2148 taskdepdata = self.build_taskdepdata(task) 2149 2150 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2151 taskhash = self.rqdata.get_task_hash(task) 2152 unihash = self.rqdata.get_task_unihash(task) 2153 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2154 if not mc in self.rq.fakeworker: 2155 try: 2156 self.rq.start_fakeworker(self, mc) 2157 except OSError as exc: 2158 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2159 self.rq.state = runQueueFailed 2160 self.stats.taskFailed() 2161 return True 2162 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2163 self.rq.fakeworker[mc].process.stdin.flush() 2164 else: 2165 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") 2166 self.rq.worker[mc].process.stdin.flush() 2167 2168 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) 2169 self.build_stamps2.append(self.build_stamps[task]) 2170 self.runq_running.add(task) 2171 self.stats.taskActive() 2172 if self.can_start_task(): 2173 return True 2174 2175 if self.stats.active > 0 or self.sq_stats.active > 0: 2176 self.rq.read_workers() 2177 return self.rq.active_fds() 2178 2179 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2180 if self.sq_deferred: 2181 tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0]) 2182 logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid) 2183 self.sq_task_failoutright(tid) 2184 return True 2185 2186 if len(self.failed_tids) != 0: 2187 self.rq.state = runQueueFailed 2188 return True 2189 2190 # Sanity Checks 2191 err = self.summarise_scenequeue_errors() 2192 for task in self.rqdata.runtaskentries: 2193 if task not in self.runq_buildable: 2194 logger.error("Task %s never buildable!", task) 2195 err = True 2196 elif task not in self.runq_running: 2197 logger.error("Task %s never ran!", task) 2198 err = True 2199 elif task not in self.runq_complete: 2200 logger.error("Task %s never completed!", task) 2201 err = True 2202 2203 if err: 2204 self.rq.state = runQueueFailed 2205 else: 2206 self.rq.state = runQueueComplete 2207 2208 return True 2209 2210 def filtermcdeps(self, task, mc, deps): 2211 ret = set() 2212 for dep in deps: 2213 thismc = mc_from_tid(dep) 2214 if thismc != mc: 2215 continue 2216 ret.add(dep) 2217 return ret 2218 2219 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2220 # as most code can't handle them 2221 def build_taskdepdata(self, task): 2222 taskdepdata = {} 2223 mc = mc_from_tid(task) 2224 next = self.rqdata.runtaskentries[task].depends.copy() 2225 next.add(task) 2226 next = self.filtermcdeps(task, mc, next) 2227 while next: 2228 additional = [] 2229 for revdep in next: 2230 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2231 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2232 deps = self.rqdata.runtaskentries[revdep].depends 2233 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2234 taskhash = self.rqdata.runtaskentries[revdep].hash 2235 unihash = self.rqdata.runtaskentries[revdep].unihash 2236 deps = self.filtermcdeps(task, mc, deps) 2237 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2238 for revdep2 in deps: 2239 if revdep2 not in taskdepdata: 2240 additional.append(revdep2) 2241 next = additional 2242 2243 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2244 return taskdepdata 2245 2246 def update_holdofftasks(self): 2247 2248 if not self.holdoff_need_update: 2249 return 2250 2251 notcovered = set(self.scenequeue_notcovered) 2252 notcovered |= self.cantskip 2253 for tid in self.scenequeue_notcovered: 2254 notcovered |= self.sqdata.sq_covered_tasks[tid] 2255 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2256 notcovered.intersection_update(self.tasks_scenequeue_done) 2257 2258 covered = set(self.scenequeue_covered) 2259 for tid in self.scenequeue_covered: 2260 covered |= self.sqdata.sq_covered_tasks[tid] 2261 covered.difference_update(notcovered) 2262 covered.intersection_update(self.tasks_scenequeue_done) 2263 2264 for tid in notcovered | covered: 2265 if len(self.rqdata.runtaskentries[tid].depends) == 0: 2266 self.setbuildable(tid) 2267 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2268 self.setbuildable(tid) 2269 2270 self.tasks_covered = covered 2271 self.tasks_notcovered = notcovered 2272 2273 self.holdoff_tasks = set() 2274 2275 for tid in self.rqdata.runq_setscene_tids: 2276 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2277 self.holdoff_tasks.add(tid) 2278 2279 for tid in self.holdoff_tasks.copy(): 2280 for dep in self.sqdata.sq_covered_tasks[tid]: 2281 if dep not in self.runq_complete: 2282 self.holdoff_tasks.add(dep) 2283 2284 self.holdoff_need_update = False 2285 2286 def process_possible_migrations(self): 2287 2288 changed = set() 2289 toprocess = set() 2290 for tid, unihash in self.updated_taskhash_queue.copy(): 2291 if tid in self.runq_running and tid not in self.runq_complete: 2292 continue 2293 2294 self.updated_taskhash_queue.remove((tid, unihash)) 2295 2296 if unihash != self.rqdata.runtaskentries[tid].unihash: 2297 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2298 torehash = [tid] 2299 for deftid in self.sq_deferred: 2300 if self.sq_deferred[deftid] == tid: 2301 torehash.append(deftid) 2302 for hashtid in torehash: 2303 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2304 self.rqdata.runtaskentries[hashtid].unihash = unihash 2305 bb.parse.siggen.set_unihash(hashtid, unihash) 2306 toprocess.add(hashtid) 2307 2308 # Work out all tasks which depend upon these 2309 total = set() 2310 next = set() 2311 for p in toprocess: 2312 next |= self.rqdata.runtaskentries[p].revdeps 2313 while next: 2314 current = next.copy() 2315 total = total | next 2316 next = set() 2317 for ntid in current: 2318 next |= self.rqdata.runtaskentries[ntid].revdeps 2319 next.difference_update(total) 2320 2321 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2322 next = set() 2323 for p in total: 2324 if len(self.rqdata.runtaskentries[p].depends) == 0: 2325 next.add(p) 2326 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2327 next.add(p) 2328 2329 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2330 while next: 2331 current = next.copy() 2332 next = set() 2333 for tid in current: 2334 if len(self.rqdata.runtaskentries[p].depends) and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2335 continue 2336 orighash = self.rqdata.runtaskentries[tid].hash 2337 dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid)) 2338 newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc) 2339 origuni = self.rqdata.runtaskentries[tid].unihash 2340 newuni = bb.parse.siggen.get_unihash(tid) 2341 # FIXME, need to check it can come from sstate at all for determinism? 2342 remapped = False 2343 if newuni == origuni: 2344 # Nothing to do, we match, skip code below 2345 remapped = True 2346 elif tid in self.scenequeue_covered or tid in self.sq_live: 2347 # Already ran this setscene task or it running. Report the new taskhash 2348 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2349 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2350 remapped = True 2351 2352 if not remapped: 2353 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2354 self.rqdata.runtaskentries[tid].hash = newhash 2355 self.rqdata.runtaskentries[tid].unihash = newuni 2356 changed.add(tid) 2357 2358 next |= self.rqdata.runtaskentries[tid].revdeps 2359 total.remove(tid) 2360 next.intersection_update(total) 2361 2362 if changed: 2363 for mc in self.rq.worker: 2364 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2365 for mc in self.rq.fakeworker: 2366 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2367 2368 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2369 2370 for tid in changed: 2371 if tid not in self.rqdata.runq_setscene_tids: 2372 continue 2373 if tid not in self.pending_migrations: 2374 self.pending_migrations.add(tid) 2375 2376 update_tasks = [] 2377 for tid in self.pending_migrations.copy(): 2378 if tid in self.runq_running or tid in self.sq_live: 2379 # Too late, task already running, not much we can do now 2380 self.pending_migrations.remove(tid) 2381 continue 2382 2383 valid = True 2384 # Check no tasks this covers are running 2385 for dep in self.sqdata.sq_covered_tasks[tid]: 2386 if dep in self.runq_running and dep not in self.runq_complete: 2387 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2388 valid = False 2389 break 2390 if not valid: 2391 continue 2392 2393 self.pending_migrations.remove(tid) 2394 changed = True 2395 2396 if tid in self.tasks_scenequeue_done: 2397 self.tasks_scenequeue_done.remove(tid) 2398 for dep in self.sqdata.sq_covered_tasks[tid]: 2399 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2400 bb.error("Task %s marked as completed but now needing to rerun? Aborting build." % dep) 2401 self.failed_tids.append(tid) 2402 self.rq.state = runQueueCleanUp 2403 return 2404 2405 if dep not in self.runq_complete: 2406 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2407 self.tasks_scenequeue_done.remove(dep) 2408 2409 if tid in self.sq_buildable: 2410 self.sq_buildable.remove(tid) 2411 if tid in self.sq_running: 2412 self.sq_running.remove(tid) 2413 harddepfail = False 2414 for t in self.sqdata.sq_harddeps: 2415 if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered: 2416 harddepfail = True 2417 break 2418 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2419 if tid not in self.sq_buildable: 2420 self.sq_buildable.add(tid) 2421 if len(self.sqdata.sq_revdeps[tid]) == 0: 2422 self.sq_buildable.add(tid) 2423 2424 if tid in self.sqdata.outrightfail: 2425 self.sqdata.outrightfail.remove(tid) 2426 if tid in self.scenequeue_notcovered: 2427 self.scenequeue_notcovered.remove(tid) 2428 if tid in self.scenequeue_covered: 2429 self.scenequeue_covered.remove(tid) 2430 if tid in self.scenequeue_notneeded: 2431 self.scenequeue_notneeded.remove(tid) 2432 2433 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2434 self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) 2435 2436 if tid in self.stampcache: 2437 del self.stampcache[tid] 2438 2439 if tid in self.build_stamps: 2440 del self.build_stamps[tid] 2441 2442 update_tasks.append((tid, harddepfail, tid in self.sqdata.valid)) 2443 2444 if update_tasks: 2445 self.sqdone = False 2446 update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2447 2448 for (tid, harddepfail, origvalid) in update_tasks: 2449 if tid in self.sqdata.valid and not origvalid: 2450 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2451 if harddepfail: 2452 self.sq_task_failoutright(tid) 2453 2454 if changed: 2455 self.holdoff_need_update = True 2456 2457 def scenequeue_updatecounters(self, task, fail=False): 2458 2459 for dep in sorted(self.sqdata.sq_deps[task]): 2460 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: 2461 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2462 # dependency could be already processed, e.g. noexec setscene task 2463 continue 2464 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2465 if noexec or stamppresent: 2466 continue 2467 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2468 self.sq_task_failoutright(dep) 2469 continue 2470 if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2471 if dep not in self.sq_buildable: 2472 self.sq_buildable.add(dep) 2473 2474 next = set([task]) 2475 while next: 2476 new = set() 2477 for t in sorted(next): 2478 self.tasks_scenequeue_done.add(t) 2479 # Look down the dependency chain for non-setscene things which this task depends on 2480 # and mark as 'done' 2481 for dep in self.rqdata.runtaskentries[t].depends: 2482 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2483 continue 2484 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2485 new.add(dep) 2486 next = new 2487 2488 self.holdoff_need_update = True 2489 2490 def sq_task_completeoutright(self, task): 2491 """ 2492 Mark a task as completed 2493 Look at the reverse dependencies and mark any task with 2494 completed dependencies as buildable 2495 """ 2496 2497 logger.debug('Found task %s which could be accelerated', task) 2498 self.scenequeue_covered.add(task) 2499 self.scenequeue_updatecounters(task) 2500 2501 def sq_check_taskfail(self, task): 2502 if self.rqdata.setscenewhitelist is not None: 2503 realtask = task.split('_setscene')[0] 2504 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2505 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2506 if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): 2507 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2508 self.rq.state = runQueueCleanUp 2509 2510 def sq_task_complete(self, task): 2511 self.sq_stats.taskCompleted() 2512 bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData) 2513 self.sq_task_completeoutright(task) 2514 2515 def sq_task_fail(self, task, result): 2516 self.sq_stats.taskFailed() 2517 bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) 2518 self.scenequeue_notcovered.add(task) 2519 self.scenequeue_updatecounters(task, True) 2520 self.sq_check_taskfail(task) 2521 2522 def sq_task_failoutright(self, task): 2523 self.sq_running.add(task) 2524 self.sq_buildable.add(task) 2525 self.sq_stats.taskSkipped() 2526 self.sq_stats.taskCompleted() 2527 self.scenequeue_notcovered.add(task) 2528 self.scenequeue_updatecounters(task, True) 2529 2530 def sq_task_skip(self, task): 2531 self.sq_running.add(task) 2532 self.sq_buildable.add(task) 2533 self.sq_task_completeoutright(task) 2534 self.sq_stats.taskSkipped() 2535 self.sq_stats.taskCompleted() 2536 2537 def sq_build_taskdepdata(self, task): 2538 def getsetscenedeps(tid): 2539 deps = set() 2540 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2541 realtid = tid + "_setscene" 2542 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2543 for (depname, idependtask) in idepends: 2544 if depname not in self.rqdata.taskData[mc].build_targets: 2545 continue 2546 2547 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2548 if depfn is None: 2549 continue 2550 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2551 deps.add(deptid) 2552 return deps 2553 2554 taskdepdata = {} 2555 next = getsetscenedeps(task) 2556 next.add(task) 2557 while next: 2558 additional = [] 2559 for revdep in next: 2560 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2561 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2562 deps = getsetscenedeps(revdep) 2563 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2564 taskhash = self.rqdata.runtaskentries[revdep].hash 2565 unihash = self.rqdata.runtaskentries[revdep].unihash 2566 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] 2567 for revdep2 in deps: 2568 if revdep2 not in taskdepdata: 2569 additional.append(revdep2) 2570 next = additional 2571 2572 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2573 return taskdepdata 2574 2575 def check_setscenewhitelist(self, tid): 2576 # Check task that is going to run against the whitelist 2577 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2578 # Ignore covered tasks 2579 if tid in self.tasks_covered: 2580 return False 2581 # Ignore stamped tasks 2582 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2583 return False 2584 # Ignore noexec tasks 2585 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2586 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2587 return False 2588 2589 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2590 if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): 2591 if tid in self.rqdata.runq_setscene_tids: 2592 msg = 'Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname) 2593 else: 2594 msg = 'Task %s.%s attempted to execute unexpectedly' % (pn, taskname) 2595 for t in self.scenequeue_notcovered: 2596 msg = msg + "\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash) 2597 logger.error(msg + '\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2598 return True 2599 return False 2600 2601class SQData(object): 2602 def __init__(self): 2603 # SceneQueue dependencies 2604 self.sq_deps = {} 2605 # SceneQueue reverse dependencies 2606 self.sq_revdeps = {} 2607 # Injected inter-setscene task dependencies 2608 self.sq_harddeps = {} 2609 # Cache of stamp files so duplicates can't run in parallel 2610 self.stamps = {} 2611 # Setscene tasks directly depended upon by the build 2612 self.unskippable = set() 2613 # List of setscene tasks which aren't present 2614 self.outrightfail = set() 2615 # A list of normal tasks a setscene task covers 2616 self.sq_covered_tasks = {} 2617 2618def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): 2619 2620 sq_revdeps = {} 2621 sq_revdeps_squash = {} 2622 sq_collated_deps = {} 2623 2624 # We need to construct a dependency graph for the setscene functions. Intermediate 2625 # dependencies between the setscene tasks only complicate the code. This code 2626 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2627 # only containing the setscene functions. 2628 2629 rqdata.init_progress_reporter.next_stage() 2630 2631 # First process the chains up to the first setscene task. 2632 endpoints = {} 2633 for tid in rqdata.runtaskentries: 2634 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2635 sq_revdeps_squash[tid] = set() 2636 if (len(sq_revdeps[tid]) == 0) and tid not in rqdata.runq_setscene_tids: 2637 #bb.warn("Added endpoint %s" % (tid)) 2638 endpoints[tid] = set() 2639 2640 rqdata.init_progress_reporter.next_stage() 2641 2642 # Secondly process the chains between setscene tasks. 2643 for tid in rqdata.runq_setscene_tids: 2644 sq_collated_deps[tid] = set() 2645 #bb.warn("Added endpoint 2 %s" % (tid)) 2646 for dep in rqdata.runtaskentries[tid].depends: 2647 if tid in sq_revdeps[dep]: 2648 sq_revdeps[dep].remove(tid) 2649 if dep not in endpoints: 2650 endpoints[dep] = set() 2651 #bb.warn(" Added endpoint 3 %s" % (dep)) 2652 endpoints[dep].add(tid) 2653 2654 rqdata.init_progress_reporter.next_stage() 2655 2656 def process_endpoints(endpoints): 2657 newendpoints = {} 2658 for point, task in endpoints.items(): 2659 tasks = set() 2660 if task: 2661 tasks |= task 2662 if sq_revdeps_squash[point]: 2663 tasks |= sq_revdeps_squash[point] 2664 if point not in rqdata.runq_setscene_tids: 2665 for t in tasks: 2666 sq_collated_deps[t].add(point) 2667 sq_revdeps_squash[point] = set() 2668 if point in rqdata.runq_setscene_tids: 2669 sq_revdeps_squash[point] = tasks 2670 tasks = set() 2671 continue 2672 for dep in rqdata.runtaskentries[point].depends: 2673 if point in sq_revdeps[dep]: 2674 sq_revdeps[dep].remove(point) 2675 if tasks: 2676 sq_revdeps_squash[dep] |= tasks 2677 if len(sq_revdeps[dep]) == 0 and dep not in rqdata.runq_setscene_tids: 2678 newendpoints[dep] = task 2679 if len(newendpoints) != 0: 2680 process_endpoints(newendpoints) 2681 2682 process_endpoints(endpoints) 2683 2684 rqdata.init_progress_reporter.next_stage() 2685 2686 # Build a list of tasks which are "unskippable" 2687 # These are direct endpoints referenced by the build upto and including setscene tasks 2688 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2689 new = True 2690 for tid in rqdata.runtaskentries: 2691 if len(rqdata.runtaskentries[tid].revdeps) == 0: 2692 sqdata.unskippable.add(tid) 2693 sqdata.unskippable |= sqrq.cantskip 2694 while new: 2695 new = False 2696 orig = sqdata.unskippable.copy() 2697 for tid in sorted(orig, reverse=True): 2698 if tid in rqdata.runq_setscene_tids: 2699 continue 2700 if len(rqdata.runtaskentries[tid].depends) == 0: 2701 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2702 sqrq.setbuildable(tid) 2703 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2704 if sqdata.unskippable != orig: 2705 new = True 2706 2707 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 2708 2709 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2710 2711 # Sanity check all dependencies could be changed to setscene task references 2712 for taskcounter, tid in enumerate(rqdata.runtaskentries): 2713 if tid in rqdata.runq_setscene_tids: 2714 pass 2715 elif len(sq_revdeps_squash[tid]) != 0: 2716 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") 2717 else: 2718 del sq_revdeps_squash[tid] 2719 rqdata.init_progress_reporter.update(taskcounter) 2720 2721 rqdata.init_progress_reporter.next_stage() 2722 2723 # Resolve setscene inter-task dependencies 2724 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 2725 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 2726 for tid in rqdata.runq_setscene_tids: 2727 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2728 realtid = tid + "_setscene" 2729 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 2730 sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True) 2731 for (depname, idependtask) in idepends: 2732 2733 if depname not in rqdata.taskData[mc].build_targets: 2734 continue 2735 2736 depfn = rqdata.taskData[mc].build_targets[depname][0] 2737 if depfn is None: 2738 continue 2739 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2740 if deptid not in rqdata.runtaskentries: 2741 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 2742 2743 if not deptid in sqdata.sq_harddeps: 2744 sqdata.sq_harddeps[deptid] = set() 2745 sqdata.sq_harddeps[deptid].add(tid) 2746 2747 sq_revdeps_squash[tid].add(deptid) 2748 # Have to zero this to avoid circular dependencies 2749 sq_revdeps_squash[deptid] = set() 2750 2751 rqdata.init_progress_reporter.next_stage() 2752 2753 for task in sqdata.sq_harddeps: 2754 for dep in sqdata.sq_harddeps[task]: 2755 sq_revdeps_squash[dep].add(task) 2756 2757 rqdata.init_progress_reporter.next_stage() 2758 2759 #for tid in sq_revdeps_squash: 2760 # data = "" 2761 # for dep in sq_revdeps_squash[tid]: 2762 # data = data + "\n %s" % dep 2763 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 2764 2765 sqdata.sq_revdeps = sq_revdeps_squash 2766 sqdata.sq_covered_tasks = sq_collated_deps 2767 2768 # Build reverse version of revdeps to populate deps structure 2769 for tid in sqdata.sq_revdeps: 2770 sqdata.sq_deps[tid] = set() 2771 for tid in sqdata.sq_revdeps: 2772 for dep in sqdata.sq_revdeps[tid]: 2773 sqdata.sq_deps[dep].add(tid) 2774 2775 rqdata.init_progress_reporter.next_stage() 2776 2777 sqdata.multiconfigs = set() 2778 for tid in sqdata.sq_revdeps: 2779 sqdata.multiconfigs.add(mc_from_tid(tid)) 2780 if len(sqdata.sq_revdeps[tid]) == 0: 2781 sqrq.sq_buildable.add(tid) 2782 2783 rqdata.init_progress_reporter.finish() 2784 2785 sqdata.noexec = set() 2786 sqdata.stamppresent = set() 2787 sqdata.valid = set() 2788 2789 update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) 2790 2791 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 2792 # in any stamp files. Pass the list out to metadata as an event. 2793 found = {} 2794 for tid in rqdata.runq_setscene_tids: 2795 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2796 stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn) 2797 if stamps: 2798 if mc not in found: 2799 found[mc] = {} 2800 found[mc][tid] = stamps 2801 for mc in found: 2802 event = bb.event.StaleSetSceneTasks(found[mc]) 2803 bb.event.fire(event, cooker.databuilder.mcdata[mc]) 2804 2805def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 2806 2807 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2808 2809 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 2810 2811 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2812 bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn) 2813 return True, False 2814 2815 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 2816 logger.debug2('Setscene stamp current for task %s', tid) 2817 return False, True 2818 2819 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 2820 logger.debug2('Normal stamp current for task %s', tid) 2821 return False, True 2822 2823 return False, False 2824 2825def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 2826 2827 tocheck = set() 2828 2829 for tid in sorted(tids): 2830 if tid in sqdata.stamppresent: 2831 sqdata.stamppresent.remove(tid) 2832 if tid in sqdata.valid: 2833 sqdata.valid.remove(tid) 2834 if tid in sqdata.outrightfail: 2835 sqdata.outrightfail.remove(tid) 2836 2837 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 2838 2839 if noexec: 2840 sqdata.noexec.add(tid) 2841 sqrq.sq_task_skip(tid) 2842 continue 2843 2844 if stamppresent: 2845 sqdata.stamppresent.add(tid) 2846 sqrq.sq_task_skip(tid) 2847 continue 2848 2849 tocheck.add(tid) 2850 2851 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 2852 2853 sqdata.hashes = {} 2854 sqrq.sq_deferred = {} 2855 for mc in sorted(sqdata.multiconfigs): 2856 for tid in sorted(sqdata.sq_revdeps): 2857 if mc_from_tid(tid) != mc: 2858 continue 2859 if tid in sqdata.stamppresent: 2860 continue 2861 if tid in sqdata.valid: 2862 continue 2863 if tid in sqdata.noexec: 2864 continue 2865 if tid in sqrq.scenequeue_notcovered: 2866 continue 2867 if tid in sqrq.scenequeue_covered: 2868 continue 2869 2870 h = pending_hash_index(tid, rqdata) 2871 if h not in sqdata.hashes: 2872 if tid in tids: 2873 sqdata.outrightfail.add(tid) 2874 sqdata.hashes[h] = tid 2875 else: 2876 sqrq.sq_deferred[tid] = sqdata.hashes[h] 2877 bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) 2878 2879 2880class TaskFailure(Exception): 2881 """ 2882 Exception raised when a task in a runqueue fails 2883 """ 2884 def __init__(self, x): 2885 self.args = x 2886 2887 2888class runQueueExitWait(bb.event.Event): 2889 """ 2890 Event when waiting for task processes to exit 2891 """ 2892 2893 def __init__(self, remain): 2894 self.remain = remain 2895 self.message = "Waiting for %s active tasks to finish" % remain 2896 bb.event.Event.__init__(self) 2897 2898class runQueueEvent(bb.event.Event): 2899 """ 2900 Base runQueue event class 2901 """ 2902 def __init__(self, task, stats, rq): 2903 self.taskid = task 2904 self.taskstring = task 2905 self.taskname = taskname_from_tid(task) 2906 self.taskfile = fn_from_tid(task) 2907 self.taskhash = rq.rqdata.get_task_hash(task) 2908 self.stats = stats.copy() 2909 bb.event.Event.__init__(self) 2910 2911class sceneQueueEvent(runQueueEvent): 2912 """ 2913 Base sceneQueue event class 2914 """ 2915 def __init__(self, task, stats, rq, noexec=False): 2916 runQueueEvent.__init__(self, task, stats, rq) 2917 self.taskstring = task + "_setscene" 2918 self.taskname = taskname_from_tid(task) + "_setscene" 2919 self.taskfile = fn_from_tid(task) 2920 self.taskhash = rq.rqdata.get_task_hash(task) 2921 2922class runQueueTaskStarted(runQueueEvent): 2923 """ 2924 Event notifying a task was started 2925 """ 2926 def __init__(self, task, stats, rq, noexec=False): 2927 runQueueEvent.__init__(self, task, stats, rq) 2928 self.noexec = noexec 2929 2930class sceneQueueTaskStarted(sceneQueueEvent): 2931 """ 2932 Event notifying a setscene task was started 2933 """ 2934 def __init__(self, task, stats, rq, noexec=False): 2935 sceneQueueEvent.__init__(self, task, stats, rq) 2936 self.noexec = noexec 2937 2938class runQueueTaskFailed(runQueueEvent): 2939 """ 2940 Event notifying a task failed 2941 """ 2942 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 2943 runQueueEvent.__init__(self, task, stats, rq) 2944 self.exitcode = exitcode 2945 self.fakeroot_log = fakeroot_log 2946 2947 def __str__(self): 2948 if self.fakeroot_log: 2949 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 2950 else: 2951 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 2952 2953class sceneQueueTaskFailed(sceneQueueEvent): 2954 """ 2955 Event notifying a setscene task failed 2956 """ 2957 def __init__(self, task, stats, exitcode, rq): 2958 sceneQueueEvent.__init__(self, task, stats, rq) 2959 self.exitcode = exitcode 2960 2961 def __str__(self): 2962 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 2963 2964class sceneQueueComplete(sceneQueueEvent): 2965 """ 2966 Event when all the sceneQueue tasks are complete 2967 """ 2968 def __init__(self, stats, rq): 2969 self.stats = stats.copy() 2970 bb.event.Event.__init__(self) 2971 2972class runQueueTaskCompleted(runQueueEvent): 2973 """ 2974 Event notifying a task completed 2975 """ 2976 2977class sceneQueueTaskCompleted(sceneQueueEvent): 2978 """ 2979 Event notifying a setscene task completed 2980 """ 2981 2982class runQueueTaskSkipped(runQueueEvent): 2983 """ 2984 Event notifying a task was skipped 2985 """ 2986 def __init__(self, task, stats, rq, reason): 2987 runQueueEvent.__init__(self, task, stats, rq) 2988 self.reason = reason 2989 2990class taskUniHashUpdate(bb.event.Event): 2991 """ 2992 Base runQueue event class 2993 """ 2994 def __init__(self, task, unihash): 2995 self.taskid = task 2996 self.unihash = unihash 2997 bb.event.Event.__init__(self) 2998 2999class runQueuePipe(): 3000 """ 3001 Abstraction for a pipe between a worker thread and the server 3002 """ 3003 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3004 self.input = pipein 3005 if pipeout: 3006 pipeout.close() 3007 bb.utils.nonblockingfd(self.input) 3008 self.queue = b"" 3009 self.d = d 3010 self.rq = rq 3011 self.rqexec = rqexec 3012 self.fakerootlogs = fakerootlogs 3013 3014 def setrunqueueexec(self, rqexec): 3015 self.rqexec = rqexec 3016 3017 def read(self): 3018 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3019 for worker in workers.values(): 3020 worker.process.poll() 3021 if worker.process.returncode is not None and not self.rq.teardown: 3022 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3023 self.rq.finish_runqueue(True) 3024 3025 start = len(self.queue) 3026 try: 3027 self.queue = self.queue + (self.input.read(102400) or b"") 3028 except (OSError, IOError) as e: 3029 if e.errno != errno.EAGAIN: 3030 raise 3031 end = len(self.queue) 3032 found = True 3033 while found and len(self.queue): 3034 found = False 3035 index = self.queue.find(b"</event>") 3036 while index != -1 and self.queue.startswith(b"<event>"): 3037 try: 3038 event = pickle.loads(self.queue[7:index]) 3039 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3040 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3041 # The pickled data could contain "</event>" so search for the next occurance 3042 # unpickling again, this should be the only way an unpickle error could occur 3043 index = self.queue.find(b"</event>", index + 1) 3044 continue 3045 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3046 bb.event.fire_from_worker(event, self.d) 3047 if isinstance(event, taskUniHashUpdate): 3048 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3049 found = True 3050 self.queue = self.queue[index+8:] 3051 index = self.queue.find(b"</event>") 3052 index = self.queue.find(b"</exitcode>") 3053 while index != -1 and self.queue.startswith(b"<exitcode>"): 3054 try: 3055 task, status = pickle.loads(self.queue[10:index]) 3056 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3057 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3058 (_, _, _, taskfn) = split_tid_mcfn(task) 3059 fakerootlog = None 3060 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3061 fakerootlog = self.fakerootlogs[taskfn] 3062 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3063 found = True 3064 self.queue = self.queue[index+11:] 3065 index = self.queue.find(b"</exitcode>") 3066 return (end > start) 3067 3068 def close(self): 3069 while self.read(): 3070 continue 3071 if len(self.queue) > 0: 3072 print("Warning, worker left partial message: %s" % self.queue) 3073 self.input.close() 3074 3075def get_setscene_enforce_whitelist(d, targets): 3076 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3077 return None 3078 whitelist = (d.getVar("BB_SETSCENE_ENFORCE_WHITELIST") or "").split() 3079 outlist = [] 3080 for item in whitelist[:]: 3081 if item.startswith('%:'): 3082 for (mc, target, task, fn) in targets: 3083 outlist.append(target + ':' + item.split(':')[1]) 3084 else: 3085 outlist.append(item) 3086 return outlist 3087 3088def check_setscene_enforce_whitelist(pn, taskname, whitelist): 3089 import fnmatch 3090 if whitelist is not None: 3091 item = '%s:%s' % (pn, taskname) 3092 for whitelist_item in whitelist: 3093 if fnmatch.fnmatch(item, whitelist_item): 3094 return True 3095 return False 3096 return True 3097