1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import logging 18import re 19import bb 20from bb import msg, event 21from bb import monitordisk 22import subprocess 23import pickle 24from multiprocessing import Process 25import shlex 26import pprint 27import time 28 29bblogger = logging.getLogger("BitBake") 30logger = logging.getLogger("BitBake.RunQueue") 31hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 32 33__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 34 35def fn_from_tid(tid): 36 return tid.rsplit(":", 1)[0] 37 38def taskname_from_tid(tid): 39 return tid.rsplit(":", 1)[1] 40 41def mc_from_tid(tid): 42 if tid.startswith('mc:') and tid.count(':') >= 2: 43 return tid.split(':')[1] 44 return "" 45 46def split_tid(tid): 47 (mc, fn, taskname, _) = split_tid_mcfn(tid) 48 return (mc, fn, taskname) 49 50def split_mc(n): 51 if n.startswith("mc:") and n.count(':') >= 2: 52 _, mc, n = n.split(":", 2) 53 return (mc, n) 54 return ('', n) 55 56def split_tid_mcfn(tid): 57 if tid.startswith('mc:') and tid.count(':') >= 2: 58 elems = tid.split(':') 59 mc = elems[1] 60 fn = ":".join(elems[2:-1]) 61 taskname = elems[-1] 62 mcfn = "mc:" + mc + ":" + fn 63 else: 64 tid = tid.rsplit(":", 1) 65 mc = "" 66 fn = tid[0] 67 taskname = tid[1] 68 mcfn = fn 69 70 return (mc, fn, taskname, mcfn) 71 72def build_tid(mc, fn, taskname): 73 if mc: 74 return "mc:" + mc + ":" + fn + ":" + taskname 75 return fn + ":" + taskname 76 77# Index used to pair up potentially matching multiconfig tasks 78# We match on PN, taskname and hash being equal 79def pending_hash_index(tid, rqdata): 80 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 81 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 82 h = rqdata.runtaskentries[tid].unihash 83 return pn + ":" + "taskname" + h 84 85class RunQueueStats: 86 """ 87 Holds statistics on the tasks handled by the associated runQueue 88 """ 89 def __init__(self, total, setscene_total): 90 self.completed = 0 91 self.skipped = 0 92 self.failed = 0 93 self.active = 0 94 self.setscene_active = 0 95 self.setscene_covered = 0 96 self.setscene_notcovered = 0 97 self.setscene_total = setscene_total 98 self.total = total 99 100 def copy(self): 101 obj = self.__class__(self.total, self.setscene_total) 102 obj.__dict__.update(self.__dict__) 103 return obj 104 105 def taskFailed(self): 106 self.active = self.active - 1 107 self.failed = self.failed + 1 108 109 def taskCompleted(self): 110 self.active = self.active - 1 111 self.completed = self.completed + 1 112 113 def taskSkipped(self): 114 self.active = self.active + 1 115 self.skipped = self.skipped + 1 116 117 def taskActive(self): 118 self.active = self.active + 1 119 120 def updateCovered(self, covered, notcovered): 121 self.setscene_covered = covered 122 self.setscene_notcovered = notcovered 123 124 def updateActiveSetscene(self, active): 125 self.setscene_active = active 126 127# These values indicate the next step due to be run in the 128# runQueue state machine 129runQueuePrepare = 2 130runQueueSceneInit = 3 131runQueueRunning = 6 132runQueueFailed = 7 133runQueueCleanUp = 8 134runQueueComplete = 9 135 136class RunQueueScheduler(object): 137 """ 138 Control the order tasks are scheduled in. 139 """ 140 name = "basic" 141 142 def __init__(self, runqueue, rqdata): 143 """ 144 The default scheduler just returns the first buildable task (the 145 priority map is sorted by task number) 146 """ 147 self.rq = runqueue 148 self.rqdata = rqdata 149 self.numTasks = len(self.rqdata.runtaskentries) 150 151 self.prio_map = [self.rqdata.runtaskentries.keys()] 152 153 self.buildable = set() 154 self.skip_maxthread = {} 155 self.stamps = {} 156 for tid in self.rqdata.runtaskentries: 157 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 158 self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 159 if tid in self.rq.runq_buildable: 160 self.buildable.append(tid) 161 162 self.rev_prio_map = None 163 self.is_pressure_usable() 164 165 def is_pressure_usable(self): 166 """ 167 If monitoring pressure, return True if pressure files can be open and read. For example 168 openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) 169 is returned. 170 """ 171 if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: 172 try: 173 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 174 open("/proc/pressure/io") as io_pressure_fds, \ 175 open("/proc/pressure/memory") as memory_pressure_fds: 176 177 self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 178 self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 179 self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 180 self.prev_pressure_time = time.time() 181 self.check_pressure = True 182 except: 183 bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") 184 self.check_pressure = False 185 else: 186 self.check_pressure = False 187 188 def exceeds_max_pressure(self): 189 """ 190 Monitor the difference in total pressure at least once per second, if 191 BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. 192 """ 193 if self.check_pressure: 194 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 195 open("/proc/pressure/io") as io_pressure_fds, \ 196 open("/proc/pressure/memory") as memory_pressure_fds: 197 # extract "total" from /proc/pressure/{cpu|io} 198 curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 199 curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 200 curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 201 now = time.time() 202 tdiff = now - self.prev_pressure_time 203 psi_accumulation_interval = 1.0 204 cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff 205 io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff 206 memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff 207 exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure 208 exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure 209 exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure 210 211 if tdiff > psi_accumulation_interval: 212 self.prev_cpu_pressure = curr_cpu_pressure 213 self.prev_io_pressure = curr_io_pressure 214 self.prev_memory_pressure = curr_memory_pressure 215 self.prev_pressure_time = now 216 217 pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure) 218 pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure) 219 if hasattr(self, "pressure_state") and pressure_state != self.pressure_state: 220 bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))) 221 self.pressure_state = pressure_state 222 return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) 223 return False 224 225 def next_buildable_task(self): 226 """ 227 Return the id of the first task we find that is buildable 228 """ 229 # Once tasks are running we don't need to worry about them again 230 self.buildable.difference_update(self.rq.runq_running) 231 buildable = set(self.buildable) 232 buildable.difference_update(self.rq.holdoff_tasks) 233 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 234 if not buildable: 235 return None 236 237 # Bitbake requires that at least one task be active. Only check for pressure if 238 # this is the case, otherwise the pressure limitation could result in no tasks 239 # being active and no new tasks started thereby, at times, breaking the scheduler. 240 if self.rq.stats.active and self.exceeds_max_pressure(): 241 return None 242 243 # Filter out tasks that have a max number of threads that have been exceeded 244 skip_buildable = {} 245 for running in self.rq.runq_running.difference(self.rq.runq_complete): 246 rtaskname = taskname_from_tid(running) 247 if rtaskname not in self.skip_maxthread: 248 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 249 if not self.skip_maxthread[rtaskname]: 250 continue 251 if rtaskname in skip_buildable: 252 skip_buildable[rtaskname] += 1 253 else: 254 skip_buildable[rtaskname] = 1 255 256 if len(buildable) == 1: 257 tid = buildable.pop() 258 taskname = taskname_from_tid(tid) 259 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 260 return None 261 stamp = self.stamps[tid] 262 if stamp not in self.rq.build_stamps.values(): 263 return tid 264 265 if not self.rev_prio_map: 266 self.rev_prio_map = {} 267 for tid in self.rqdata.runtaskentries: 268 self.rev_prio_map[tid] = self.prio_map.index(tid) 269 270 best = None 271 bestprio = None 272 for tid in buildable: 273 taskname = taskname_from_tid(tid) 274 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 275 continue 276 prio = self.rev_prio_map[tid] 277 if bestprio is None or bestprio > prio: 278 stamp = self.stamps[tid] 279 if stamp in self.rq.build_stamps.values(): 280 continue 281 bestprio = prio 282 best = tid 283 284 return best 285 286 def next(self): 287 """ 288 Return the id of the task we should build next 289 """ 290 if self.rq.can_start_task(): 291 return self.next_buildable_task() 292 293 def newbuildable(self, task): 294 self.buildable.add(task) 295 296 def removebuildable(self, task): 297 self.buildable.remove(task) 298 299 def describe_task(self, taskid): 300 result = 'ID %s' % taskid 301 if self.rev_prio_map: 302 result = result + (' pri %d' % self.rev_prio_map[taskid]) 303 return result 304 305 def dump_prio(self, comment): 306 bb.debug(3, '%s (most important first):\n%s' % 307 (comment, 308 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 309 index, taskid in enumerate(self.prio_map)]))) 310 311class RunQueueSchedulerSpeed(RunQueueScheduler): 312 """ 313 A scheduler optimised for speed. The priority map is sorted by task weight, 314 heavier weighted tasks (tasks needed by the most other tasks) are run first. 315 """ 316 name = "speed" 317 318 def __init__(self, runqueue, rqdata): 319 """ 320 The priority map is sorted by task weight. 321 """ 322 RunQueueScheduler.__init__(self, runqueue, rqdata) 323 324 weights = {} 325 for tid in self.rqdata.runtaskentries: 326 weight = self.rqdata.runtaskentries[tid].weight 327 if not weight in weights: 328 weights[weight] = [] 329 weights[weight].append(tid) 330 331 self.prio_map = [] 332 for weight in sorted(weights): 333 for w in weights[weight]: 334 self.prio_map.append(w) 335 336 self.prio_map.reverse() 337 338class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 339 """ 340 A scheduler optimised to complete .bb files as quickly as possible. The 341 priority map is sorted by task weight, but then reordered so once a given 342 .bb file starts to build, it's completed as quickly as possible by 343 running all tasks related to the same .bb file one after the after. 344 This works well where disk space is at a premium and classes like OE's 345 rm_work are in force. 346 """ 347 name = "completion" 348 349 def __init__(self, runqueue, rqdata): 350 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 351 352 # Extract list of tasks for each recipe, with tasks sorted 353 # ascending from "must run first" (typically do_fetch) to 354 # "runs last" (do_build). The speed scheduler prioritizes 355 # tasks that must run first before the ones that run later; 356 # this is what we depend on here. 357 task_lists = {} 358 for taskid in self.prio_map: 359 fn, taskname = taskid.rsplit(':', 1) 360 task_lists.setdefault(fn, []).append(taskname) 361 362 # Now unify the different task lists. The strategy is that 363 # common tasks get skipped and new ones get inserted after the 364 # preceeding common one(s) as they are found. Because task 365 # lists should differ only by their number of tasks, but not 366 # the ordering of the common tasks, this should result in a 367 # deterministic result that is a superset of the individual 368 # task ordering. 369 all_tasks = [] 370 for recipe, new_tasks in task_lists.items(): 371 index = 0 372 old_task = all_tasks[index] if index < len(all_tasks) else None 373 for new_task in new_tasks: 374 if old_task == new_task: 375 # Common task, skip it. This is the fast-path which 376 # avoids a full search. 377 index += 1 378 old_task = all_tasks[index] if index < len(all_tasks) else None 379 else: 380 try: 381 index = all_tasks.index(new_task) 382 # Already present, just not at the current 383 # place. We re-synchronized by changing the 384 # index so that it matches again. Now 385 # move on to the next existing task. 386 index += 1 387 old_task = all_tasks[index] if index < len(all_tasks) else None 388 except ValueError: 389 # Not present. Insert before old_task, which 390 # remains the same (but gets shifted back). 391 all_tasks.insert(index, new_task) 392 index += 1 393 bb.debug(3, 'merged task list: %s' % all_tasks) 394 395 # Now reverse the order so that tasks that finish the work on one 396 # recipe are considered more imporant (= come first). The ordering 397 # is now so that do_build is most important. 398 all_tasks.reverse() 399 400 # Group tasks of the same kind before tasks of less important 401 # kinds at the head of the queue (because earlier = lower 402 # priority number = runs earlier), while preserving the 403 # ordering by recipe. If recipe foo is more important than 404 # bar, then the goal is to work on foo's do_populate_sysroot 405 # before bar's do_populate_sysroot and on the more important 406 # tasks of foo before any of the less important tasks in any 407 # other recipe (if those other recipes are more important than 408 # foo). 409 # 410 # All of this only applies when tasks are runable. Explicit 411 # dependencies still override this ordering by priority. 412 # 413 # Here's an example why this priority re-ordering helps with 414 # minimizing disk usage. Consider a recipe foo with a higher 415 # priority than bar where foo DEPENDS on bar. Then the 416 # implicit rule (from base.bbclass) is that foo's do_configure 417 # depends on bar's do_populate_sysroot. This ensures that 418 # bar's do_populate_sysroot gets done first. Normally the 419 # tasks from foo would continue to run once that is done, and 420 # bar only gets completed and cleaned up later. By ordering 421 # bar's task that depend on bar's do_populate_sysroot before foo's 422 # do_configure, that problem gets avoided. 423 task_index = 0 424 self.dump_prio('original priorities') 425 for task in all_tasks: 426 for index in range(task_index, self.numTasks): 427 taskid = self.prio_map[index] 428 taskname = taskid.rsplit(':', 1)[1] 429 if taskname == task: 430 del self.prio_map[index] 431 self.prio_map.insert(task_index, taskid) 432 task_index += 1 433 self.dump_prio('completion priorities') 434 435class RunTaskEntry(object): 436 def __init__(self): 437 self.depends = set() 438 self.revdeps = set() 439 self.hash = None 440 self.unihash = None 441 self.task = None 442 self.weight = 1 443 444class RunQueueData: 445 """ 446 BitBake Run Queue implementation 447 """ 448 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 449 self.cooker = cooker 450 self.dataCaches = dataCaches 451 self.taskData = taskData 452 self.targets = targets 453 self.rq = rq 454 self.warn_multi_bb = False 455 456 self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() 457 self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) 458 self.setscene_ignore_tasks_checked = False 459 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 460 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 461 462 self.reset() 463 464 def reset(self): 465 self.runtaskentries = {} 466 467 def runq_depends_names(self, ids): 468 import re 469 ret = [] 470 for id in ids: 471 nam = os.path.basename(id) 472 nam = re.sub("_[^,]*,", ",", nam) 473 ret.extend([nam]) 474 return ret 475 476 def get_task_hash(self, tid): 477 return self.runtaskentries[tid].hash 478 479 def get_task_unihash(self, tid): 480 return self.runtaskentries[tid].unihash 481 482 def get_user_idstring(self, tid, task_name_suffix = ""): 483 return tid + task_name_suffix 484 485 def get_short_user_idstring(self, task, task_name_suffix = ""): 486 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 487 pn = self.dataCaches[mc].pkg_fn[taskfn] 488 taskname = taskname_from_tid(task) + task_name_suffix 489 return "%s:%s" % (pn, taskname) 490 491 def circular_depchains_handler(self, tasks): 492 """ 493 Some tasks aren't buildable, likely due to circular dependency issues. 494 Identify the circular dependencies and print them in a user readable format. 495 """ 496 from copy import deepcopy 497 498 valid_chains = [] 499 explored_deps = {} 500 msgs = [] 501 502 class TooManyLoops(Exception): 503 pass 504 505 def chain_reorder(chain): 506 """ 507 Reorder a dependency chain so the lowest task id is first 508 """ 509 lowest = 0 510 new_chain = [] 511 for entry in range(len(chain)): 512 if chain[entry] < chain[lowest]: 513 lowest = entry 514 new_chain.extend(chain[lowest:]) 515 new_chain.extend(chain[:lowest]) 516 return new_chain 517 518 def chain_compare_equal(chain1, chain2): 519 """ 520 Compare two dependency chains and see if they're the same 521 """ 522 if len(chain1) != len(chain2): 523 return False 524 for index in range(len(chain1)): 525 if chain1[index] != chain2[index]: 526 return False 527 return True 528 529 def chain_array_contains(chain, chain_array): 530 """ 531 Return True if chain_array contains chain 532 """ 533 for ch in chain_array: 534 if chain_compare_equal(ch, chain): 535 return True 536 return False 537 538 def find_chains(tid, prev_chain): 539 prev_chain.append(tid) 540 total_deps = [] 541 total_deps.extend(self.runtaskentries[tid].revdeps) 542 for revdep in self.runtaskentries[tid].revdeps: 543 if revdep in prev_chain: 544 idx = prev_chain.index(revdep) 545 # To prevent duplicates, reorder the chain to start with the lowest taskid 546 # and search through an array of those we've already printed 547 chain = prev_chain[idx:] 548 new_chain = chain_reorder(chain) 549 if not chain_array_contains(new_chain, valid_chains): 550 valid_chains.append(new_chain) 551 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 552 for dep in new_chain: 553 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 554 msgs.append("\n") 555 if len(valid_chains) > 10: 556 msgs.append("Halted dependency loops search after 10 matches.\n") 557 raise TooManyLoops 558 continue 559 scan = False 560 if revdep not in explored_deps: 561 scan = True 562 elif revdep in explored_deps[revdep]: 563 scan = True 564 else: 565 for dep in prev_chain: 566 if dep in explored_deps[revdep]: 567 scan = True 568 if scan: 569 find_chains(revdep, copy.deepcopy(prev_chain)) 570 for dep in explored_deps[revdep]: 571 if dep not in total_deps: 572 total_deps.append(dep) 573 574 explored_deps[tid] = total_deps 575 576 try: 577 for task in tasks: 578 find_chains(task, []) 579 except TooManyLoops: 580 pass 581 582 return msgs 583 584 def calculate_task_weights(self, endpoints): 585 """ 586 Calculate a number representing the "weight" of each task. Heavier weighted tasks 587 have more dependencies and hence should be executed sooner for maximum speed. 588 589 This function also sanity checks the task list finding tasks that are not 590 possible to execute due to circular dependencies. 591 """ 592 593 numTasks = len(self.runtaskentries) 594 weight = {} 595 deps_left = {} 596 task_done = {} 597 598 for tid in self.runtaskentries: 599 task_done[tid] = False 600 weight[tid] = 1 601 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 602 603 for tid in endpoints: 604 weight[tid] = 10 605 task_done[tid] = True 606 607 while True: 608 next_points = [] 609 for tid in endpoints: 610 for revdep in self.runtaskentries[tid].depends: 611 weight[revdep] = weight[revdep] + weight[tid] 612 deps_left[revdep] = deps_left[revdep] - 1 613 if deps_left[revdep] == 0: 614 next_points.append(revdep) 615 task_done[revdep] = True 616 endpoints = next_points 617 if not next_points: 618 break 619 620 # Circular dependency sanity check 621 problem_tasks = [] 622 for tid in self.runtaskentries: 623 if task_done[tid] is False or deps_left[tid] != 0: 624 problem_tasks.append(tid) 625 logger.debug2("Task %s is not buildable", tid) 626 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 627 self.runtaskentries[tid].weight = weight[tid] 628 629 if problem_tasks: 630 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 631 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 632 message = message + "Identifying dependency loops (this may take a short while)...\n" 633 logger.error(message) 634 635 msgs = self.circular_depchains_handler(problem_tasks) 636 637 message = "\n" 638 for msg in msgs: 639 message = message + msg 640 bb.msg.fatal("RunQueue", message) 641 642 return weight 643 644 def prepare(self): 645 """ 646 Turn a set of taskData into a RunQueue and compute data needed 647 to optimise the execution order. 648 """ 649 650 runq_build = {} 651 recursivetasks = {} 652 recursiveitasks = {} 653 recursivetasksselfref = set() 654 655 taskData = self.taskData 656 657 found = False 658 for mc in self.taskData: 659 if taskData[mc].taskentries: 660 found = True 661 break 662 if not found: 663 # Nothing to do 664 return 0 665 666 bb.parse.siggen.setup_datacache(self.dataCaches) 667 668 self.init_progress_reporter.start() 669 self.init_progress_reporter.next_stage() 670 bb.event.check_for_interrupts(self.cooker.data) 671 672 # Step A - Work out a list of tasks to run 673 # 674 # Taskdata gives us a list of possible providers for every build and run 675 # target ordered by priority. It also gives information on each of those 676 # providers. 677 # 678 # To create the actual list of tasks to execute we fix the list of 679 # providers and then resolve the dependencies into task IDs. This 680 # process is repeated for each type of dependency (tdepends, deptask, 681 # rdeptast, recrdeptask, idepends). 682 683 def add_build_dependencies(depids, tasknames, depends, mc): 684 for depname in depids: 685 # Won't be in build_targets if ASSUME_PROVIDED 686 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 687 continue 688 depdata = taskData[mc].build_targets[depname][0] 689 if depdata is None: 690 continue 691 for taskname in tasknames: 692 t = depdata + ":" + taskname 693 if t in taskData[mc].taskentries: 694 depends.add(t) 695 696 def add_runtime_dependencies(depids, tasknames, depends, mc): 697 for depname in depids: 698 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 699 continue 700 depdata = taskData[mc].run_targets[depname][0] 701 if depdata is None: 702 continue 703 for taskname in tasknames: 704 t = depdata + ":" + taskname 705 if t in taskData[mc].taskentries: 706 depends.add(t) 707 708 def add_mc_dependencies(mc, tid): 709 mcdeps = taskData[mc].get_mcdepends() 710 for dep in mcdeps: 711 mcdependency = dep.split(':') 712 pn = mcdependency[3] 713 frommc = mcdependency[1] 714 mcdep = mcdependency[2] 715 deptask = mcdependency[4] 716 if mcdep not in taskData: 717 bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep)) 718 if mc == frommc: 719 fn = taskData[mcdep].build_targets[pn][0] 720 newdep = '%s:%s' % (fn,deptask) 721 taskData[mc].taskentries[tid].tdepends.append(newdep) 722 723 for mc in taskData: 724 for tid in taskData[mc].taskentries: 725 726 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 727 #runtid = build_tid(mc, fn, taskname) 728 729 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 730 731 depends = set() 732 task_deps = self.dataCaches[mc].task_deps[taskfn] 733 734 self.runtaskentries[tid] = RunTaskEntry() 735 736 if fn in taskData[mc].failed_fns: 737 continue 738 739 # We add multiconfig dependencies before processing internal task deps (tdepends) 740 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 741 add_mc_dependencies(mc, tid) 742 743 # Resolve task internal dependencies 744 # 745 # e.g. addtask before X after Y 746 for t in taskData[mc].taskentries[tid].tdepends: 747 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 748 depends.add(build_tid(depmc, depfn, deptaskname)) 749 750 # Resolve 'deptask' dependencies 751 # 752 # e.g. do_sometask[deptask] = "do_someothertask" 753 # (makes sure sometask runs after someothertask of all DEPENDS) 754 if 'deptask' in task_deps and taskname in task_deps['deptask']: 755 tasknames = task_deps['deptask'][taskname].split() 756 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 757 758 # Resolve 'rdeptask' dependencies 759 # 760 # e.g. do_sometask[rdeptask] = "do_someothertask" 761 # (makes sure sometask runs after someothertask of all RDEPENDS) 762 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 763 tasknames = task_deps['rdeptask'][taskname].split() 764 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 765 766 # Resolve inter-task dependencies 767 # 768 # e.g. do_sometask[depends] = "targetname:do_someothertask" 769 # (makes sure sometask runs after targetname's someothertask) 770 idepends = taskData[mc].taskentries[tid].idepends 771 for (depname, idependtask) in idepends: 772 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 773 # Won't be in build_targets if ASSUME_PROVIDED 774 depdata = taskData[mc].build_targets[depname][0] 775 if depdata is not None: 776 t = depdata + ":" + idependtask 777 depends.add(t) 778 if t not in taskData[mc].taskentries: 779 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 780 irdepends = taskData[mc].taskentries[tid].irdepends 781 for (depname, idependtask) in irdepends: 782 if depname in taskData[mc].run_targets: 783 # Won't be in run_targets if ASSUME_PROVIDED 784 if not taskData[mc].run_targets[depname]: 785 continue 786 depdata = taskData[mc].run_targets[depname][0] 787 if depdata is not None: 788 t = depdata + ":" + idependtask 789 depends.add(t) 790 if t not in taskData[mc].taskentries: 791 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 792 793 # Resolve recursive 'recrdeptask' dependencies (Part A) 794 # 795 # e.g. do_sometask[recrdeptask] = "do_someothertask" 796 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 797 # We cover the recursive part of the dependencies below 798 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 799 tasknames = task_deps['recrdeptask'][taskname].split() 800 recursivetasks[tid] = tasknames 801 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 802 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 803 if taskname in tasknames: 804 recursivetasksselfref.add(tid) 805 806 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 807 recursiveitasks[tid] = [] 808 for t in task_deps['recideptask'][taskname].split(): 809 newdep = build_tid(mc, fn, t) 810 recursiveitasks[tid].append(newdep) 811 812 self.runtaskentries[tid].depends = depends 813 # Remove all self references 814 self.runtaskentries[tid].depends.discard(tid) 815 816 #self.dump_data() 817 818 self.init_progress_reporter.next_stage() 819 bb.event.check_for_interrupts(self.cooker.data) 820 821 # Resolve recursive 'recrdeptask' dependencies (Part B) 822 # 823 # e.g. do_sometask[recrdeptask] = "do_someothertask" 824 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 825 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 826 827 # Generating/interating recursive lists of dependencies is painful and potentially slow 828 # Precompute recursive task dependencies here by: 829 # a) create a temp list of reverse dependencies (revdeps) 830 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 831 # c) combine the total list of dependencies in cumulativedeps 832 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 833 834 835 revdeps = {} 836 deps = {} 837 cumulativedeps = {} 838 for tid in self.runtaskentries: 839 deps[tid] = set(self.runtaskentries[tid].depends) 840 revdeps[tid] = set() 841 cumulativedeps[tid] = set() 842 # Generate a temp list of reverse dependencies 843 for tid in self.runtaskentries: 844 for dep in self.runtaskentries[tid].depends: 845 revdeps[dep].add(tid) 846 # Find the dependency chain endpoints 847 endpoints = set() 848 for tid in self.runtaskentries: 849 if not deps[tid]: 850 endpoints.add(tid) 851 # Iterate the chains collating dependencies 852 while endpoints: 853 next = set() 854 for tid in endpoints: 855 for dep in revdeps[tid]: 856 cumulativedeps[dep].add(fn_from_tid(tid)) 857 cumulativedeps[dep].update(cumulativedeps[tid]) 858 if tid in deps[dep]: 859 deps[dep].remove(tid) 860 if not deps[dep]: 861 next.add(dep) 862 endpoints = next 863 #for tid in deps: 864 # if deps[tid]: 865 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 866 867 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 868 # resolve these recursively until we aren't adding any further extra dependencies 869 extradeps = True 870 while extradeps: 871 extradeps = 0 872 for tid in recursivetasks: 873 tasknames = recursivetasks[tid] 874 875 totaldeps = set(self.runtaskentries[tid].depends) 876 if tid in recursiveitasks: 877 totaldeps.update(recursiveitasks[tid]) 878 for dep in recursiveitasks[tid]: 879 if dep not in self.runtaskentries: 880 continue 881 totaldeps.update(self.runtaskentries[dep].depends) 882 883 deps = set() 884 for dep in totaldeps: 885 if dep in cumulativedeps: 886 deps.update(cumulativedeps[dep]) 887 888 for t in deps: 889 for taskname in tasknames: 890 newtid = t + ":" + taskname 891 if newtid == tid: 892 continue 893 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 894 extradeps += 1 895 self.runtaskentries[tid].depends.add(newtid) 896 897 # Handle recursive tasks which depend upon other recursive tasks 898 deps = set() 899 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 900 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 901 for newtid in deps: 902 for taskname in tasknames: 903 if not newtid.endswith(":" + taskname): 904 continue 905 if newtid in self.runtaskentries: 906 extradeps += 1 907 self.runtaskentries[tid].depends.add(newtid) 908 909 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 910 911 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 912 for tid in recursivetasksselfref: 913 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 914 915 self.init_progress_reporter.next_stage() 916 bb.event.check_for_interrupts(self.cooker.data) 917 918 #self.dump_data() 919 920 # Step B - Mark all active tasks 921 # 922 # Start with the tasks we were asked to run and mark all dependencies 923 # as active too. If the task is to be 'forced', clear its stamp. Once 924 # all active tasks are marked, prune the ones we don't need. 925 926 logger.verbose("Marking Active Tasks") 927 928 def mark_active(tid, depth): 929 """ 930 Mark an item as active along with its depends 931 (calls itself recursively) 932 """ 933 934 if tid in runq_build: 935 return 936 937 runq_build[tid] = 1 938 939 depends = self.runtaskentries[tid].depends 940 for depend in depends: 941 mark_active(depend, depth+1) 942 943 def invalidate_task(tid, error_nostamp): 944 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 945 taskdep = self.dataCaches[mc].task_deps[taskfn] 946 if fn + ":" + taskname not in taskData[mc].taskentries: 947 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 948 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 949 if error_nostamp: 950 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 951 else: 952 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 953 else: 954 logger.verbose("Invalidate task %s, %s", taskname, fn) 955 bb.parse.siggen.invalidate_task(taskname, taskfn) 956 957 self.target_tids = [] 958 for (mc, target, task, fn) in self.targets: 959 960 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 961 continue 962 963 if target in taskData[mc].failed_deps: 964 continue 965 966 parents = False 967 if task.endswith('-'): 968 parents = True 969 task = task[:-1] 970 971 if fn in taskData[mc].failed_fns: 972 continue 973 974 # fn already has mc prefix 975 tid = fn + ":" + task 976 self.target_tids.append(tid) 977 if tid not in taskData[mc].taskentries: 978 import difflib 979 tasks = [] 980 for x in taskData[mc].taskentries: 981 if x.startswith(fn + ":"): 982 tasks.append(taskname_from_tid(x)) 983 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 984 if close_matches: 985 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 986 else: 987 extra = "" 988 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 989 990 # For tasks called "XXXX-", ony run their dependencies 991 if parents: 992 for i in self.runtaskentries[tid].depends: 993 mark_active(i, 1) 994 else: 995 mark_active(tid, 1) 996 997 self.init_progress_reporter.next_stage() 998 bb.event.check_for_interrupts(self.cooker.data) 999 1000 # Step C - Prune all inactive tasks 1001 # 1002 # Once all active tasks are marked, prune the ones we don't need. 1003 1004 # Handle --runall 1005 if self.cooker.configuration.runall: 1006 # re-run the mark_active and then drop unused tasks from new list 1007 reduced_tasklist = set(self.runtaskentries.keys()) 1008 for tid in list(self.runtaskentries.keys()): 1009 if tid not in runq_build: 1010 reduced_tasklist.remove(tid) 1011 runq_build = {} 1012 1013 for task in self.cooker.configuration.runall: 1014 if not task.startswith("do_"): 1015 task = "do_{0}".format(task) 1016 runall_tids = set() 1017 for tid in reduced_tasklist: 1018 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 1019 if wanttid in self.runtaskentries: 1020 runall_tids.add(wanttid) 1021 1022 for tid in list(runall_tids): 1023 mark_active(tid, 1) 1024 if self.cooker.configuration.force: 1025 invalidate_task(tid, False) 1026 1027 delcount = set() 1028 for tid in list(self.runtaskentries.keys()): 1029 if tid not in runq_build: 1030 delcount.add(tid) 1031 del self.runtaskentries[tid] 1032 1033 if self.cooker.configuration.runall: 1034 if not self.runtaskentries: 1035 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 1036 1037 self.init_progress_reporter.next_stage() 1038 bb.event.check_for_interrupts(self.cooker.data) 1039 1040 # Handle runonly 1041 if self.cooker.configuration.runonly: 1042 # re-run the mark_active and then drop unused tasks from new list 1043 runq_build = {} 1044 1045 for task in self.cooker.configuration.runonly: 1046 if not task.startswith("do_"): 1047 task = "do_{0}".format(task) 1048 runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] 1049 1050 for tid in runonly_tids: 1051 mark_active(tid, 1) 1052 if self.cooker.configuration.force: 1053 invalidate_task(tid, False) 1054 1055 for tid in list(self.runtaskentries.keys()): 1056 if tid not in runq_build: 1057 delcount.add(tid) 1058 del self.runtaskentries[tid] 1059 1060 if not self.runtaskentries: 1061 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 1062 1063 # 1064 # Step D - Sanity checks and computation 1065 # 1066 1067 # Check to make sure we still have tasks to run 1068 if not self.runtaskentries: 1069 if not taskData[''].halt: 1070 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 1071 else: 1072 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 1073 1074 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 1075 1076 logger.verbose("Assign Weightings") 1077 1078 self.init_progress_reporter.next_stage() 1079 bb.event.check_for_interrupts(self.cooker.data) 1080 1081 # Generate a list of reverse dependencies to ease future calculations 1082 for tid in self.runtaskentries: 1083 for dep in self.runtaskentries[tid].depends: 1084 self.runtaskentries[dep].revdeps.add(tid) 1085 1086 self.init_progress_reporter.next_stage() 1087 bb.event.check_for_interrupts(self.cooker.data) 1088 1089 # Identify tasks at the end of dependency chains 1090 # Error on circular dependency loops (length two) 1091 endpoints = [] 1092 for tid in self.runtaskentries: 1093 revdeps = self.runtaskentries[tid].revdeps 1094 if not revdeps: 1095 endpoints.append(tid) 1096 for dep in revdeps: 1097 if dep in self.runtaskentries[tid].depends: 1098 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1099 1100 1101 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1102 1103 self.init_progress_reporter.next_stage() 1104 bb.event.check_for_interrupts(self.cooker.data) 1105 1106 # Calculate task weights 1107 # Check of higher length circular dependencies 1108 self.runq_weight = self.calculate_task_weights(endpoints) 1109 1110 self.init_progress_reporter.next_stage() 1111 bb.event.check_for_interrupts(self.cooker.data) 1112 1113 # Sanity Check - Check for multiple tasks building the same provider 1114 for mc in self.dataCaches: 1115 prov_list = {} 1116 seen_fn = [] 1117 for tid in self.runtaskentries: 1118 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1119 if taskfn in seen_fn: 1120 continue 1121 if mc != tidmc: 1122 continue 1123 seen_fn.append(taskfn) 1124 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1125 if prov not in prov_list: 1126 prov_list[prov] = [taskfn] 1127 elif taskfn not in prov_list[prov]: 1128 prov_list[prov].append(taskfn) 1129 for prov in prov_list: 1130 if len(prov_list[prov]) < 2: 1131 continue 1132 if prov in self.multi_provider_allowed: 1133 continue 1134 seen_pn = [] 1135 # If two versions of the same PN are being built its fatal, we don't support it. 1136 for fn in prov_list[prov]: 1137 pn = self.dataCaches[mc].pkg_fn[fn] 1138 if pn not in seen_pn: 1139 seen_pn.append(pn) 1140 else: 1141 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1142 msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] 1143 # 1144 # Construct a list of things which uniquely depend on each provider 1145 # since this may help the user figure out which dependency is triggering this warning 1146 # 1147 msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") 1148 deplist = {} 1149 commondeps = None 1150 for provfn in prov_list[prov]: 1151 deps = set() 1152 for tid in self.runtaskentries: 1153 fn = fn_from_tid(tid) 1154 if fn != provfn: 1155 continue 1156 for dep in self.runtaskentries[tid].revdeps: 1157 fn = fn_from_tid(dep) 1158 if fn == provfn: 1159 continue 1160 deps.add(dep) 1161 if not commondeps: 1162 commondeps = set(deps) 1163 else: 1164 commondeps &= deps 1165 deplist[provfn] = deps 1166 for provfn in deplist: 1167 msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) 1168 # 1169 # Construct a list of provides and runtime providers for each recipe 1170 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1171 # 1172 msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") 1173 provide_results = {} 1174 rprovide_results = {} 1175 commonprovs = None 1176 commonrprovs = None 1177 for provfn in prov_list[prov]: 1178 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1179 rprovides = set() 1180 for rprovide in self.dataCaches[mc].rproviders: 1181 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1182 rprovides.add(rprovide) 1183 for package in self.dataCaches[mc].packages: 1184 if provfn in self.dataCaches[mc].packages[package]: 1185 rprovides.add(package) 1186 for package in self.dataCaches[mc].packages_dynamic: 1187 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1188 rprovides.add(package) 1189 if not commonprovs: 1190 commonprovs = set(provides) 1191 else: 1192 commonprovs &= provides 1193 provide_results[provfn] = provides 1194 if not commonrprovs: 1195 commonrprovs = set(rprovides) 1196 else: 1197 commonrprovs &= rprovides 1198 rprovide_results[provfn] = rprovides 1199 #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) 1200 #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) 1201 for provfn in prov_list[prov]: 1202 msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) 1203 msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) 1204 1205 if self.warn_multi_bb: 1206 logger.verbnote("".join(msgs)) 1207 else: 1208 logger.error("".join(msgs)) 1209 1210 self.init_progress_reporter.next_stage() 1211 self.init_progress_reporter.next_stage() 1212 bb.event.check_for_interrupts(self.cooker.data) 1213 1214 # Iterate over the task list looking for tasks with a 'setscene' function 1215 self.runq_setscene_tids = set() 1216 if not self.cooker.configuration.nosetscene: 1217 for tid in self.runtaskentries: 1218 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1219 setscenetid = tid + "_setscene" 1220 if setscenetid not in taskData[mc].taskentries: 1221 continue 1222 self.runq_setscene_tids.add(tid) 1223 1224 self.init_progress_reporter.next_stage() 1225 bb.event.check_for_interrupts(self.cooker.data) 1226 1227 # Invalidate task if force mode active 1228 if self.cooker.configuration.force: 1229 for tid in self.target_tids: 1230 invalidate_task(tid, False) 1231 1232 # Invalidate task if invalidate mode active 1233 if self.cooker.configuration.invalidate_stamp: 1234 for tid in self.target_tids: 1235 fn = fn_from_tid(tid) 1236 for st in self.cooker.configuration.invalidate_stamp.split(','): 1237 if not st.startswith("do_"): 1238 st = "do_%s" % st 1239 invalidate_task(fn + ":" + st, True) 1240 1241 self.init_progress_reporter.next_stage() 1242 bb.event.check_for_interrupts(self.cooker.data) 1243 1244 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1245 for mc in taskData: 1246 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1247 virtpnmap = {} 1248 for v in virtmap: 1249 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1250 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1251 if hasattr(bb.parse.siggen, "tasks_resolved"): 1252 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1253 1254 self.init_progress_reporter.next_stage() 1255 bb.event.check_for_interrupts(self.cooker.data) 1256 1257 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1258 1259 # Iterate over the task list and call into the siggen code 1260 dealtwith = set() 1261 todeal = set(self.runtaskentries) 1262 while todeal: 1263 for tid in todeal.copy(): 1264 if not (self.runtaskentries[tid].depends - dealtwith): 1265 dealtwith.add(tid) 1266 todeal.remove(tid) 1267 self.prepare_task_hash(tid) 1268 bb.event.check_for_interrupts(self.cooker.data) 1269 1270 bb.parse.siggen.writeout_file_checksum_cache() 1271 1272 #self.dump_data() 1273 return len(self.runtaskentries) 1274 1275 def prepare_task_hash(self, tid): 1276 bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1277 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1278 self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 1279 1280 def dump_data(self): 1281 """ 1282 Dump some debug information on the internal data structures 1283 """ 1284 logger.debug3("run_tasks:") 1285 for tid in self.runtaskentries: 1286 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1287 self.runtaskentries[tid].weight, 1288 self.runtaskentries[tid].depends, 1289 self.runtaskentries[tid].revdeps) 1290 1291class RunQueueWorker(): 1292 def __init__(self, process, pipe): 1293 self.process = process 1294 self.pipe = pipe 1295 1296class RunQueue: 1297 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1298 1299 self.cooker = cooker 1300 self.cfgData = cfgData 1301 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1302 1303 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1304 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1305 1306 self.state = runQueuePrepare 1307 1308 # For disk space monitor 1309 # Invoked at regular time intervals via the bitbake heartbeat event 1310 # while the build is running. We generate a unique name for the handler 1311 # here, just in case that there ever is more than one RunQueue instance, 1312 # start the handler when reaching runQueueSceneInit, and stop it when 1313 # done with the build. 1314 self.dm = monitordisk.diskMonitor(cfgData) 1315 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1316 self.dm_event_handler_registered = False 1317 self.rqexe = None 1318 self.worker = {} 1319 self.fakeworker = {} 1320 1321 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1322 logger.debug("Starting bitbake-worker") 1323 magic = "decafbad" 1324 if self.cooker.configuration.profile: 1325 magic = "decafbadbad" 1326 fakerootlogs = None 1327 1328 workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker") 1329 if fakeroot: 1330 magic = magic + "beef" 1331 mcdata = self.cooker.databuilder.mcdata[mc] 1332 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1333 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1334 env = os.environ.copy() 1335 for key, value in (var.split('=') for var in fakerootenv): 1336 env[key] = value 1337 worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1338 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1339 else: 1340 worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1341 bb.utils.nonblockingfd(worker.stdout) 1342 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1343 1344 workerdata = { 1345 "sigdata" : bb.parse.siggen.get_taskdata(), 1346 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1347 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1348 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1349 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1350 "prhost" : self.cooker.prhost, 1351 "buildname" : self.cfgData.getVar("BUILDNAME"), 1352 "date" : self.cfgData.getVar("DATE"), 1353 "time" : self.cfgData.getVar("TIME"), 1354 "hashservaddr" : self.cooker.hashservaddr, 1355 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1356 } 1357 1358 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") 1359 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") 1360 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 1361 worker.stdin.flush() 1362 1363 return RunQueueWorker(worker, workerpipe) 1364 1365 def _teardown_worker(self, worker): 1366 if not worker: 1367 return 1368 logger.debug("Teardown for bitbake-worker") 1369 try: 1370 worker.process.stdin.write(b"<quit></quit>") 1371 worker.process.stdin.flush() 1372 worker.process.stdin.close() 1373 except IOError: 1374 pass 1375 while worker.process.returncode is None: 1376 worker.pipe.read() 1377 worker.process.poll() 1378 while worker.pipe.read(): 1379 continue 1380 worker.pipe.close() 1381 1382 def start_worker(self): 1383 if self.worker: 1384 self.teardown_workers() 1385 self.teardown = False 1386 for mc in self.rqdata.dataCaches: 1387 self.worker[mc] = self._start_worker(mc) 1388 1389 def start_fakeworker(self, rqexec, mc): 1390 if not mc in self.fakeworker: 1391 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1392 1393 def teardown_workers(self): 1394 self.teardown = True 1395 for mc in self.worker: 1396 self._teardown_worker(self.worker[mc]) 1397 self.worker = {} 1398 for mc in self.fakeworker: 1399 self._teardown_worker(self.fakeworker[mc]) 1400 self.fakeworker = {} 1401 1402 def read_workers(self): 1403 for mc in self.worker: 1404 self.worker[mc].pipe.read() 1405 for mc in self.fakeworker: 1406 self.fakeworker[mc].pipe.read() 1407 1408 def active_fds(self): 1409 fds = [] 1410 for mc in self.worker: 1411 fds.append(self.worker[mc].pipe.input) 1412 for mc in self.fakeworker: 1413 fds.append(self.fakeworker[mc].pipe.input) 1414 return fds 1415 1416 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1417 def get_timestamp(f): 1418 try: 1419 if not os.access(f, os.F_OK): 1420 return None 1421 return os.stat(f)[stat.ST_MTIME] 1422 except: 1423 return None 1424 1425 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1426 if taskname is None: 1427 taskname = tn 1428 1429 stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn) 1430 1431 # If the stamp is missing, it's not current 1432 if not os.access(stampfile, os.F_OK): 1433 logger.debug2("Stampfile %s not available", stampfile) 1434 return False 1435 # If it's a 'nostamp' task, it's not current 1436 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1437 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1438 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1439 return False 1440 1441 if taskname.endswith("_setscene"): 1442 return True 1443 1444 if cache is None: 1445 cache = {} 1446 1447 iscurrent = True 1448 t1 = get_timestamp(stampfile) 1449 for dep in self.rqdata.runtaskentries[tid].depends: 1450 if iscurrent: 1451 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1452 stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2) 1453 stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2) 1454 t2 = get_timestamp(stampfile2) 1455 t3 = get_timestamp(stampfile3) 1456 if t3 and not t2: 1457 continue 1458 if t3 and t3 > t2: 1459 continue 1460 if fn == fn2: 1461 if not t2: 1462 logger.debug2('Stampfile %s does not exist', stampfile2) 1463 iscurrent = False 1464 break 1465 if t1 < t2: 1466 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1467 iscurrent = False 1468 break 1469 if recurse and iscurrent: 1470 if dep in cache: 1471 iscurrent = cache[dep] 1472 if not iscurrent: 1473 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1474 else: 1475 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1476 cache[dep] = iscurrent 1477 if recurse: 1478 cache[tid] = iscurrent 1479 return iscurrent 1480 1481 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1482 valid = set() 1483 if self.hashvalidate: 1484 sq_data = {} 1485 sq_data['hash'] = {} 1486 sq_data['hashfn'] = {} 1487 sq_data['unihash'] = {} 1488 for tid in tocheck: 1489 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1490 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1491 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1492 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1493 1494 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1495 1496 return valid 1497 1498 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1499 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1500 1501 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1502 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1503 1504 return bb.utils.better_eval(call, locs) 1505 1506 def _execute_runqueue(self): 1507 """ 1508 Run the tasks in a queue prepared by rqdata.prepare() 1509 Upon failure, optionally try to recover the build using any alternate providers 1510 (if the halt on failure configuration option isn't set) 1511 """ 1512 1513 retval = True 1514 bb.event.check_for_interrupts(self.cooker.data) 1515 1516 if self.state is runQueuePrepare: 1517 # NOTE: if you add, remove or significantly refactor the stages of this 1518 # process then you should recalculate the weightings here. This is quite 1519 # easy to do - just change the next line temporarily to pass debug=True as 1520 # the last parameter and you'll get a printout of the weightings as well 1521 # as a map to the lines where next_stage() was called. Of course this isn't 1522 # critical, but it helps to keep the progress reporting accurate. 1523 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1524 "Initialising tasks", 1525 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1526 if self.rqdata.prepare() == 0: 1527 self.state = runQueueComplete 1528 else: 1529 self.state = runQueueSceneInit 1530 bb.parse.siggen.save_unitaskhashes() 1531 1532 if self.state is runQueueSceneInit: 1533 self.rqdata.init_progress_reporter.next_stage() 1534 1535 # we are ready to run, emit dependency info to any UI or class which 1536 # needs it 1537 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1538 self.rqdata.init_progress_reporter.next_stage() 1539 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1540 1541 if not self.dm_event_handler_registered: 1542 res = bb.event.register(self.dm_event_handler_name, 1543 lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1544 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1545 self.dm_event_handler_registered = True 1546 1547 dump = self.cooker.configuration.dump_signatures 1548 if dump: 1549 self.rqdata.init_progress_reporter.finish() 1550 if 'printdiff' in dump: 1551 invalidtasks = self.print_diffscenetasks() 1552 self.dump_signatures(dump) 1553 if 'printdiff' in dump: 1554 self.write_diffscenetasks(invalidtasks) 1555 self.state = runQueueComplete 1556 1557 if self.state is runQueueSceneInit: 1558 self.rqdata.init_progress_reporter.next_stage() 1559 self.start_worker() 1560 self.rqdata.init_progress_reporter.next_stage() 1561 self.rqexe = RunQueueExecute(self) 1562 1563 # If we don't have any setscene functions, skip execution 1564 if not self.rqdata.runq_setscene_tids: 1565 logger.info('No setscene tasks') 1566 for tid in self.rqdata.runtaskentries: 1567 if not self.rqdata.runtaskentries[tid].depends: 1568 self.rqexe.setbuildable(tid) 1569 self.rqexe.tasks_notcovered.add(tid) 1570 self.rqexe.sqdone = True 1571 logger.info('Executing Tasks') 1572 self.state = runQueueRunning 1573 1574 if self.state is runQueueRunning: 1575 retval = self.rqexe.execute() 1576 1577 if self.state is runQueueCleanUp: 1578 retval = self.rqexe.finish() 1579 1580 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1581 1582 if build_done and self.dm_event_handler_registered: 1583 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1584 self.dm_event_handler_registered = False 1585 1586 if build_done and self.rqexe: 1587 bb.parse.siggen.save_unitaskhashes() 1588 self.teardown_workers() 1589 if self.rqexe: 1590 if self.rqexe.stats.failed: 1591 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1592 else: 1593 # Let's avoid the word "failed" if nothing actually did 1594 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1595 1596 if self.state is runQueueFailed: 1597 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1598 1599 if self.state is runQueueComplete: 1600 # All done 1601 return False 1602 1603 # Loop 1604 return retval 1605 1606 def execute_runqueue(self): 1607 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1608 try: 1609 return self._execute_runqueue() 1610 except bb.runqueue.TaskFailure: 1611 raise 1612 except SystemExit: 1613 raise 1614 except bb.BBHandledException: 1615 try: 1616 self.teardown_workers() 1617 except: 1618 pass 1619 self.state = runQueueComplete 1620 raise 1621 except Exception as err: 1622 logger.exception("An uncaught exception occurred in runqueue") 1623 try: 1624 self.teardown_workers() 1625 except: 1626 pass 1627 self.state = runQueueComplete 1628 raise 1629 1630 def finish_runqueue(self, now = False): 1631 if not self.rqexe: 1632 self.state = runQueueComplete 1633 return 1634 1635 if now: 1636 self.rqexe.finish_now() 1637 else: 1638 self.rqexe.finish() 1639 1640 def _rq_dump_sigtid(self, tids): 1641 for tid in tids: 1642 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1643 dataCaches = self.rqdata.dataCaches 1644 bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True) 1645 1646 def dump_signatures(self, options): 1647 if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset: 1648 bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled") 1649 1650 bb.note("Writing task signature files") 1651 1652 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1653 def chunkify(l, n): 1654 return [l[i::n] for i in range(n)] 1655 tids = chunkify(list(self.rqdata.runtaskentries), max_process) 1656 # We cannot use the real multiprocessing.Pool easily due to some local data 1657 # that can't be pickled. This is a cheap multi-process solution. 1658 launched = [] 1659 while tids: 1660 if len(launched) < max_process: 1661 p = Process(target=self._rq_dump_sigtid, args=(tids.pop(), )) 1662 p.start() 1663 launched.append(p) 1664 for q in launched: 1665 # The finished processes are joined when calling is_alive() 1666 if not q.is_alive(): 1667 launched.remove(q) 1668 for p in launched: 1669 p.join() 1670 1671 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1672 1673 return 1674 1675 def print_diffscenetasks(self): 1676 1677 noexec = [] 1678 tocheck = set() 1679 1680 for tid in self.rqdata.runtaskentries: 1681 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1682 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1683 1684 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1685 noexec.append(tid) 1686 continue 1687 1688 tocheck.add(tid) 1689 1690 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1691 1692 # Tasks which are both setscene and noexec never care about dependencies 1693 # We therefore find tasks which are setscene and noexec and mark their 1694 # unique dependencies as valid. 1695 for tid in noexec: 1696 if tid not in self.rqdata.runq_setscene_tids: 1697 continue 1698 for dep in self.rqdata.runtaskentries[tid].depends: 1699 hasnoexecparents = True 1700 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1701 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1702 continue 1703 hasnoexecparents = False 1704 break 1705 if hasnoexecparents: 1706 valid_new.add(dep) 1707 1708 invalidtasks = set() 1709 for tid in self.rqdata.runtaskentries: 1710 if tid not in valid_new and tid not in noexec: 1711 invalidtasks.add(tid) 1712 1713 found = set() 1714 processed = set() 1715 for tid in invalidtasks: 1716 toprocess = set([tid]) 1717 while toprocess: 1718 next = set() 1719 for t in toprocess: 1720 for dep in self.rqdata.runtaskentries[t].depends: 1721 if dep in invalidtasks: 1722 found.add(tid) 1723 if dep not in processed: 1724 processed.add(dep) 1725 next.add(dep) 1726 toprocess = next 1727 if tid in found: 1728 toprocess = set() 1729 1730 tasklist = [] 1731 for tid in invalidtasks.difference(found): 1732 tasklist.append(tid) 1733 1734 if tasklist: 1735 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1736 1737 return invalidtasks.difference(found) 1738 1739 def write_diffscenetasks(self, invalidtasks): 1740 1741 # Define recursion callback 1742 def recursecb(key, hash1, hash2): 1743 hashes = [hash1, hash2] 1744 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1745 1746 recout = [] 1747 if len(hashfiles) == 2: 1748 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) 1749 recout.extend(list(' ' + l for l in out2)) 1750 else: 1751 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1752 1753 return recout 1754 1755 1756 for tid in invalidtasks: 1757 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1758 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1759 h = self.rqdata.runtaskentries[tid].hash 1760 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) 1761 match = None 1762 for m in matches: 1763 if h in m: 1764 match = m 1765 if match is None: 1766 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) 1767 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1768 if matches: 1769 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] 1770 prevh = __find_sha256__.search(latestmatch).group(0) 1771 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1772 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1773 1774 1775class RunQueueExecute: 1776 1777 def __init__(self, rq): 1778 self.rq = rq 1779 self.cooker = rq.cooker 1780 self.cfgData = rq.cfgData 1781 self.rqdata = rq.rqdata 1782 1783 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1784 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1785 self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") 1786 self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") 1787 self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") 1788 1789 self.sq_buildable = set() 1790 self.sq_running = set() 1791 self.sq_live = set() 1792 1793 self.updated_taskhash_queue = [] 1794 self.pending_migrations = set() 1795 1796 self.runq_buildable = set() 1797 self.runq_running = set() 1798 self.runq_complete = set() 1799 self.runq_tasksrun = set() 1800 1801 self.build_stamps = {} 1802 self.build_stamps2 = [] 1803 self.failed_tids = [] 1804 self.sq_deferred = {} 1805 1806 self.stampcache = {} 1807 1808 self.holdoff_tasks = set() 1809 self.holdoff_need_update = True 1810 self.sqdone = False 1811 1812 self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) 1813 1814 for mc in rq.worker: 1815 rq.worker[mc].pipe.setrunqueueexec(self) 1816 for mc in rq.fakeworker: 1817 rq.fakeworker[mc].pipe.setrunqueueexec(self) 1818 1819 if self.number_tasks <= 0: 1820 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1821 1822 lower_limit = 1.0 1823 upper_limit = 1000000.0 1824 if self.max_cpu_pressure: 1825 self.max_cpu_pressure = float(self.max_cpu_pressure) 1826 if self.max_cpu_pressure < lower_limit: 1827 bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) 1828 if self.max_cpu_pressure > upper_limit: 1829 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) 1830 1831 if self.max_io_pressure: 1832 self.max_io_pressure = float(self.max_io_pressure) 1833 if self.max_io_pressure < lower_limit: 1834 bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) 1835 if self.max_io_pressure > upper_limit: 1836 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1837 1838 if self.max_memory_pressure: 1839 self.max_memory_pressure = float(self.max_memory_pressure) 1840 if self.max_memory_pressure < lower_limit: 1841 bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) 1842 if self.max_memory_pressure > upper_limit: 1843 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1844 1845 # List of setscene tasks which we've covered 1846 self.scenequeue_covered = set() 1847 # List of tasks which are covered (including setscene ones) 1848 self.tasks_covered = set() 1849 self.tasks_scenequeue_done = set() 1850 self.scenequeue_notcovered = set() 1851 self.tasks_notcovered = set() 1852 self.scenequeue_notneeded = set() 1853 1854 # We can't skip specified target tasks which aren't setscene tasks 1855 self.cantskip = set(self.rqdata.target_tids) 1856 self.cantskip.difference_update(self.rqdata.runq_setscene_tids) 1857 self.cantskip.intersection_update(self.rqdata.runtaskentries) 1858 1859 schedulers = self.get_schedulers() 1860 for scheduler in schedulers: 1861 if self.scheduler == scheduler.name: 1862 self.sched = scheduler(self, self.rqdata) 1863 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1864 break 1865 else: 1866 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1867 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1868 1869 #if self.rqdata.runq_setscene_tids: 1870 self.sqdata = SQData() 1871 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) 1872 1873 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1874 1875 # self.build_stamps[pid] may not exist when use shared work directory. 1876 if task in self.build_stamps: 1877 self.build_stamps2.remove(self.build_stamps[task]) 1878 del self.build_stamps[task] 1879 1880 if task in self.sq_live: 1881 if status != 0: 1882 self.sq_task_fail(task, status) 1883 else: 1884 self.sq_task_complete(task) 1885 self.sq_live.remove(task) 1886 self.stats.updateActiveSetscene(len(self.sq_live)) 1887 else: 1888 if status != 0: 1889 self.task_fail(task, status, fakerootlog=fakerootlog) 1890 else: 1891 self.task_complete(task) 1892 return True 1893 1894 def finish_now(self): 1895 for mc in self.rq.worker: 1896 try: 1897 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") 1898 self.rq.worker[mc].process.stdin.flush() 1899 except IOError: 1900 # worker must have died? 1901 pass 1902 for mc in self.rq.fakeworker: 1903 try: 1904 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") 1905 self.rq.fakeworker[mc].process.stdin.flush() 1906 except IOError: 1907 # worker must have died? 1908 pass 1909 1910 if self.failed_tids: 1911 self.rq.state = runQueueFailed 1912 return 1913 1914 self.rq.state = runQueueComplete 1915 return 1916 1917 def finish(self): 1918 self.rq.state = runQueueCleanUp 1919 1920 active = self.stats.active + len(self.sq_live) 1921 if active > 0: 1922 bb.event.fire(runQueueExitWait(active), self.cfgData) 1923 self.rq.read_workers() 1924 return self.rq.active_fds() 1925 1926 if self.failed_tids: 1927 self.rq.state = runQueueFailed 1928 return True 1929 1930 self.rq.state = runQueueComplete 1931 return True 1932 1933 # Used by setscene only 1934 def check_dependencies(self, task, taskdeps): 1935 if not self.rq.depvalidate: 1936 return False 1937 1938 # Must not edit parent data 1939 taskdeps = set(taskdeps) 1940 1941 taskdata = {} 1942 taskdeps.add(task) 1943 for dep in taskdeps: 1944 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 1945 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1946 taskdata[dep] = [pn, taskname, fn] 1947 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 1948 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 1949 valid = bb.utils.better_eval(call, locs) 1950 return valid 1951 1952 def can_start_task(self): 1953 active = self.stats.active + len(self.sq_live) 1954 can_start = active < self.number_tasks 1955 return can_start 1956 1957 def get_schedulers(self): 1958 schedulers = set(obj for obj in globals().values() 1959 if type(obj) is type and 1960 issubclass(obj, RunQueueScheduler)) 1961 1962 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 1963 if user_schedulers: 1964 for sched in user_schedulers.split(): 1965 if not "." in sched: 1966 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 1967 continue 1968 1969 modname, name = sched.rsplit(".", 1) 1970 try: 1971 module = __import__(modname, fromlist=(name,)) 1972 except ImportError as exc: 1973 bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 1974 else: 1975 schedulers.add(getattr(module, name)) 1976 return schedulers 1977 1978 def setbuildable(self, task): 1979 self.runq_buildable.add(task) 1980 self.sched.newbuildable(task) 1981 1982 def task_completeoutright(self, task): 1983 """ 1984 Mark a task as completed 1985 Look at the reverse dependencies and mark any task with 1986 completed dependencies as buildable 1987 """ 1988 self.runq_complete.add(task) 1989 for revdep in self.rqdata.runtaskentries[task].revdeps: 1990 if revdep in self.runq_running: 1991 continue 1992 if revdep in self.runq_buildable: 1993 continue 1994 alldeps = True 1995 for dep in self.rqdata.runtaskentries[revdep].depends: 1996 if dep not in self.runq_complete: 1997 alldeps = False 1998 break 1999 if alldeps: 2000 self.setbuildable(revdep) 2001 logger.debug("Marking task %s as buildable", revdep) 2002 2003 found = None 2004 for t in sorted(self.sq_deferred.copy()): 2005 if self.sq_deferred[t] == task: 2006 # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. 2007 # We shouldn't allow all to run at once as it is prone to races. 2008 if not found: 2009 bb.debug(1, "Deferred task %s now buildable" % t) 2010 del self.sq_deferred[t] 2011 update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2012 found = t 2013 else: 2014 bb.debug(1, "Deferring %s after %s" % (t, found)) 2015 self.sq_deferred[t] = found 2016 2017 def task_complete(self, task): 2018 self.stats.taskCompleted() 2019 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2020 self.task_completeoutright(task) 2021 self.runq_tasksrun.add(task) 2022 2023 def task_fail(self, task, exitcode, fakerootlog=None): 2024 """ 2025 Called when a task has failed 2026 Updates the state engine with the failure 2027 """ 2028 self.stats.taskFailed() 2029 self.failed_tids.append(task) 2030 2031 fakeroot_log = [] 2032 if fakerootlog and os.path.exists(fakerootlog): 2033 with open(fakerootlog) as fakeroot_log_file: 2034 fakeroot_failed = False 2035 for line in reversed(fakeroot_log_file.readlines()): 2036 for fakeroot_error in ['mismatch', 'error', 'fatal']: 2037 if fakeroot_error in line.lower(): 2038 fakeroot_failed = True 2039 if 'doing new pid setup and server start' in line: 2040 break 2041 fakeroot_log.append(line) 2042 2043 if not fakeroot_failed: 2044 fakeroot_log = [] 2045 2046 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) 2047 2048 if self.rqdata.taskData[''].halt: 2049 self.rq.state = runQueueCleanUp 2050 2051 def task_skip(self, task, reason): 2052 self.runq_running.add(task) 2053 self.setbuildable(task) 2054 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 2055 self.task_completeoutright(task) 2056 self.stats.taskSkipped() 2057 self.stats.taskCompleted() 2058 2059 def summarise_scenequeue_errors(self): 2060 err = False 2061 if not self.sqdone: 2062 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2063 completeevent = sceneQueueComplete(self.stats, self.rq) 2064 bb.event.fire(completeevent, self.cfgData) 2065 if self.sq_deferred: 2066 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 2067 err = True 2068 if self.updated_taskhash_queue: 2069 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 2070 err = True 2071 if self.holdoff_tasks: 2072 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 2073 err = True 2074 2075 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 2076 # No task should end up in both covered and uncovered, that is a bug. 2077 logger.error("Setscene task %s in both covered and notcovered." % tid) 2078 2079 for tid in self.rqdata.runq_setscene_tids: 2080 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2081 err = True 2082 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 2083 if tid not in self.sq_buildable: 2084 err = True 2085 logger.error("Setscene Task %s was never marked as buildable" % tid) 2086 if tid not in self.sq_running: 2087 err = True 2088 logger.error("Setscene Task %s was never marked as running" % tid) 2089 2090 for x in self.rqdata.runtaskentries: 2091 if x not in self.tasks_covered and x not in self.tasks_notcovered: 2092 logger.error("Task %s was never moved from the setscene queue" % x) 2093 err = True 2094 if x not in self.tasks_scenequeue_done: 2095 logger.error("Task %s was never processed by the setscene code" % x) 2096 err = True 2097 if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: 2098 logger.error("Task %s was never marked as buildable by the setscene code" % x) 2099 err = True 2100 return err 2101 2102 2103 def execute(self): 2104 """ 2105 Run the tasks in a queue prepared by prepare_runqueue 2106 """ 2107 2108 self.rq.read_workers() 2109 if self.updated_taskhash_queue or self.pending_migrations: 2110 self.process_possible_migrations() 2111 2112 if not hasattr(self, "sorted_setscene_tids"): 2113 # Don't want to sort this set every execution 2114 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 2115 2116 task = None 2117 if not self.sqdone and self.can_start_task(): 2118 # Find the next setscene to run 2119 for nexttask in self.sorted_setscene_tids: 2120 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): 2121 if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2122 if nexttask not in self.rqdata.target_tids: 2123 logger.debug2("Skipping setscene for task %s" % nexttask) 2124 self.sq_task_skip(nexttask) 2125 self.scenequeue_notneeded.add(nexttask) 2126 if nexttask in self.sq_deferred: 2127 del self.sq_deferred[nexttask] 2128 return True 2129 # If covered tasks are running, need to wait for them to complete 2130 for t in self.sqdata.sq_covered_tasks[nexttask]: 2131 if t in self.runq_running and t not in self.runq_complete: 2132 continue 2133 if nexttask in self.sq_deferred: 2134 if self.sq_deferred[nexttask] not in self.runq_complete: 2135 continue 2136 logger.debug("Task %s no longer deferred" % nexttask) 2137 del self.sq_deferred[nexttask] 2138 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2139 if not valid: 2140 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2141 self.sq_task_failoutright(nexttask) 2142 return True 2143 if nexttask in self.sqdata.outrightfail: 2144 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2145 self.sq_task_failoutright(nexttask) 2146 return True 2147 if nexttask in self.sqdata.unskippable: 2148 logger.debug2("Setscene task %s is unskippable" % nexttask) 2149 task = nexttask 2150 break 2151 if task is not None: 2152 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2153 taskname = taskname + "_setscene" 2154 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2155 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2156 self.sq_task_failoutright(task) 2157 return True 2158 2159 if self.cooker.configuration.force: 2160 if task in self.rqdata.target_tids: 2161 self.sq_task_failoutright(task) 2162 return True 2163 2164 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2165 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2166 self.sq_task_skip(task) 2167 return True 2168 2169 if self.cooker.configuration.skipsetscene: 2170 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2171 self.sq_task_failoutright(task) 2172 return True 2173 2174 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 2175 bb.event.fire(startevent, self.cfgData) 2176 2177 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2178 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2179 runtask = { 2180 'fn' : taskfn, 2181 'task' : task, 2182 'taskname' : taskname, 2183 'taskhash' : self.rqdata.get_task_hash(task), 2184 'unihash' : self.rqdata.get_task_unihash(task), 2185 'quieterrors' : True, 2186 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2187 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2188 'taskdepdata' : self.sq_build_taskdepdata(task), 2189 'dry_run' : False, 2190 'taskdep': taskdep, 2191 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2192 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2193 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2194 } 2195 2196 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2197 if not mc in self.rq.fakeworker: 2198 self.rq.start_fakeworker(self, mc) 2199 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2200 self.rq.fakeworker[mc].process.stdin.flush() 2201 else: 2202 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2203 self.rq.worker[mc].process.stdin.flush() 2204 2205 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2206 self.build_stamps2.append(self.build_stamps[task]) 2207 self.sq_running.add(task) 2208 self.sq_live.add(task) 2209 self.stats.updateActiveSetscene(len(self.sq_live)) 2210 if self.can_start_task(): 2211 return True 2212 2213 self.update_holdofftasks() 2214 2215 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2216 hashequiv_logger.verbose("Setscene tasks completed") 2217 2218 err = self.summarise_scenequeue_errors() 2219 if err: 2220 self.rq.state = runQueueFailed 2221 return True 2222 2223 if self.cooker.configuration.setsceneonly: 2224 self.rq.state = runQueueComplete 2225 return True 2226 self.sqdone = True 2227 2228 if self.stats.total == 0: 2229 # nothing to do 2230 self.rq.state = runQueueComplete 2231 return True 2232 2233 if self.cooker.configuration.setsceneonly: 2234 task = None 2235 else: 2236 task = self.sched.next() 2237 if task is not None: 2238 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2239 2240 if self.rqdata.setscene_ignore_tasks is not None: 2241 if self.check_setscene_ignore_tasks(task): 2242 self.task_fail(task, "setscene ignore_tasks") 2243 return True 2244 2245 if task in self.tasks_covered: 2246 logger.debug2("Setscene covered task %s", task) 2247 self.task_skip(task, "covered") 2248 return True 2249 2250 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2251 logger.debug2("Stamp current task %s", task) 2252 2253 self.task_skip(task, "existing") 2254 self.runq_tasksrun.add(task) 2255 return True 2256 2257 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2258 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2259 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2260 noexec=True) 2261 bb.event.fire(startevent, self.cfgData) 2262 self.runq_running.add(task) 2263 self.stats.taskActive() 2264 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2265 bb.build.make_stamp_mcfn(taskname, taskfn) 2266 self.task_complete(task) 2267 return True 2268 else: 2269 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2270 bb.event.fire(startevent, self.cfgData) 2271 2272 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2273 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2274 runtask = { 2275 'fn' : taskfn, 2276 'task' : task, 2277 'taskname' : taskname, 2278 'taskhash' : self.rqdata.get_task_hash(task), 2279 'unihash' : self.rqdata.get_task_unihash(task), 2280 'quieterrors' : False, 2281 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2282 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2283 'taskdepdata' : self.build_taskdepdata(task), 2284 'dry_run' : self.rqdata.setscene_enforce, 2285 'taskdep': taskdep, 2286 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2287 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2288 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2289 } 2290 2291 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2292 if not mc in self.rq.fakeworker: 2293 try: 2294 self.rq.start_fakeworker(self, mc) 2295 except OSError as exc: 2296 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2297 self.rq.state = runQueueFailed 2298 self.stats.taskFailed() 2299 return True 2300 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2301 self.rq.fakeworker[mc].process.stdin.flush() 2302 else: 2303 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2304 self.rq.worker[mc].process.stdin.flush() 2305 2306 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2307 self.build_stamps2.append(self.build_stamps[task]) 2308 self.runq_running.add(task) 2309 self.stats.taskActive() 2310 if self.can_start_task(): 2311 return True 2312 2313 if self.stats.active > 0 or self.sq_live: 2314 self.rq.read_workers() 2315 return self.rq.active_fds() 2316 2317 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2318 if self.sq_deferred: 2319 deferred_tid = list(self.sq_deferred.keys())[0] 2320 blocking_tid = self.sq_deferred.pop(deferred_tid) 2321 logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid)) 2322 return True 2323 2324 if self.failed_tids: 2325 self.rq.state = runQueueFailed 2326 return True 2327 2328 # Sanity Checks 2329 err = self.summarise_scenequeue_errors() 2330 for task in self.rqdata.runtaskentries: 2331 if task not in self.runq_buildable: 2332 logger.error("Task %s never buildable!", task) 2333 err = True 2334 elif task not in self.runq_running: 2335 logger.error("Task %s never ran!", task) 2336 err = True 2337 elif task not in self.runq_complete: 2338 logger.error("Task %s never completed!", task) 2339 err = True 2340 2341 if err: 2342 self.rq.state = runQueueFailed 2343 else: 2344 self.rq.state = runQueueComplete 2345 2346 return True 2347 2348 def filtermcdeps(self, task, mc, deps): 2349 ret = set() 2350 for dep in deps: 2351 thismc = mc_from_tid(dep) 2352 if thismc != mc: 2353 continue 2354 ret.add(dep) 2355 return ret 2356 2357 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2358 # as most code can't handle them 2359 def build_taskdepdata(self, task): 2360 taskdepdata = {} 2361 mc = mc_from_tid(task) 2362 next = self.rqdata.runtaskentries[task].depends.copy() 2363 next.add(task) 2364 next = self.filtermcdeps(task, mc, next) 2365 while next: 2366 additional = [] 2367 for revdep in next: 2368 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2369 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2370 deps = self.rqdata.runtaskentries[revdep].depends 2371 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2372 taskhash = self.rqdata.runtaskentries[revdep].hash 2373 unihash = self.rqdata.runtaskentries[revdep].unihash 2374 deps = self.filtermcdeps(task, mc, deps) 2375 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] 2376 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] 2377 for revdep2 in deps: 2378 if revdep2 not in taskdepdata: 2379 additional.append(revdep2) 2380 next = additional 2381 2382 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2383 return taskdepdata 2384 2385 def update_holdofftasks(self): 2386 2387 if not self.holdoff_need_update: 2388 return 2389 2390 notcovered = set(self.scenequeue_notcovered) 2391 notcovered |= self.cantskip 2392 for tid in self.scenequeue_notcovered: 2393 notcovered |= self.sqdata.sq_covered_tasks[tid] 2394 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2395 notcovered.intersection_update(self.tasks_scenequeue_done) 2396 2397 covered = set(self.scenequeue_covered) 2398 for tid in self.scenequeue_covered: 2399 covered |= self.sqdata.sq_covered_tasks[tid] 2400 covered.difference_update(notcovered) 2401 covered.intersection_update(self.tasks_scenequeue_done) 2402 2403 for tid in notcovered | covered: 2404 if not self.rqdata.runtaskentries[tid].depends: 2405 self.setbuildable(tid) 2406 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2407 self.setbuildable(tid) 2408 2409 self.tasks_covered = covered 2410 self.tasks_notcovered = notcovered 2411 2412 self.holdoff_tasks = set() 2413 2414 for tid in self.rqdata.runq_setscene_tids: 2415 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2416 self.holdoff_tasks.add(tid) 2417 2418 for tid in self.holdoff_tasks.copy(): 2419 for dep in self.sqdata.sq_covered_tasks[tid]: 2420 if dep not in self.runq_complete: 2421 self.holdoff_tasks.add(dep) 2422 2423 self.holdoff_need_update = False 2424 2425 def process_possible_migrations(self): 2426 2427 changed = set() 2428 toprocess = set() 2429 for tid, unihash in self.updated_taskhash_queue.copy(): 2430 if tid in self.runq_running and tid not in self.runq_complete: 2431 continue 2432 2433 self.updated_taskhash_queue.remove((tid, unihash)) 2434 2435 if unihash != self.rqdata.runtaskentries[tid].unihash: 2436 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2437 torehash = [tid] 2438 for deftid in self.sq_deferred: 2439 if self.sq_deferred[deftid] == tid: 2440 torehash.append(deftid) 2441 for hashtid in torehash: 2442 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2443 self.rqdata.runtaskentries[hashtid].unihash = unihash 2444 bb.parse.siggen.set_unihash(hashtid, unihash) 2445 toprocess.add(hashtid) 2446 if torehash: 2447 # Need to save after set_unihash above 2448 bb.parse.siggen.save_unitaskhashes() 2449 2450 # Work out all tasks which depend upon these 2451 total = set() 2452 next = set() 2453 for p in toprocess: 2454 next |= self.rqdata.runtaskentries[p].revdeps 2455 while next: 2456 current = next.copy() 2457 total = total | next 2458 next = set() 2459 for ntid in current: 2460 next |= self.rqdata.runtaskentries[ntid].revdeps 2461 next.difference_update(total) 2462 2463 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2464 next = set() 2465 for p in total: 2466 if not self.rqdata.runtaskentries[p].depends: 2467 next.add(p) 2468 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2469 next.add(p) 2470 2471 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2472 while next: 2473 current = next.copy() 2474 next = set() 2475 for tid in current: 2476 if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2477 continue 2478 orighash = self.rqdata.runtaskentries[tid].hash 2479 newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) 2480 origuni = self.rqdata.runtaskentries[tid].unihash 2481 newuni = bb.parse.siggen.get_unihash(tid) 2482 # FIXME, need to check it can come from sstate at all for determinism? 2483 remapped = False 2484 if newuni == origuni: 2485 # Nothing to do, we match, skip code below 2486 remapped = True 2487 elif tid in self.scenequeue_covered or tid in self.sq_live: 2488 # Already ran this setscene task or it running. Report the new taskhash 2489 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2490 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2491 remapped = True 2492 2493 if not remapped: 2494 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2495 self.rqdata.runtaskentries[tid].hash = newhash 2496 self.rqdata.runtaskentries[tid].unihash = newuni 2497 changed.add(tid) 2498 2499 next |= self.rqdata.runtaskentries[tid].revdeps 2500 total.remove(tid) 2501 next.intersection_update(total) 2502 2503 if changed: 2504 for mc in self.rq.worker: 2505 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2506 for mc in self.rq.fakeworker: 2507 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2508 2509 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2510 2511 for tid in changed: 2512 if tid not in self.rqdata.runq_setscene_tids: 2513 continue 2514 if tid not in self.pending_migrations: 2515 self.pending_migrations.add(tid) 2516 2517 update_tasks = [] 2518 for tid in self.pending_migrations.copy(): 2519 if tid in self.runq_running or tid in self.sq_live: 2520 # Too late, task already running, not much we can do now 2521 self.pending_migrations.remove(tid) 2522 continue 2523 2524 valid = True 2525 # Check no tasks this covers are running 2526 for dep in self.sqdata.sq_covered_tasks[tid]: 2527 if dep in self.runq_running and dep not in self.runq_complete: 2528 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2529 valid = False 2530 break 2531 if not valid: 2532 continue 2533 2534 self.pending_migrations.remove(tid) 2535 changed = True 2536 2537 if tid in self.tasks_scenequeue_done: 2538 self.tasks_scenequeue_done.remove(tid) 2539 for dep in self.sqdata.sq_covered_tasks[tid]: 2540 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2541 bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) 2542 self.failed_tids.append(tid) 2543 self.rq.state = runQueueCleanUp 2544 return 2545 2546 if dep not in self.runq_complete: 2547 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2548 self.tasks_scenequeue_done.remove(dep) 2549 2550 if tid in self.sq_buildable: 2551 self.sq_buildable.remove(tid) 2552 if tid in self.sq_running: 2553 self.sq_running.remove(tid) 2554 if tid in self.sqdata.outrightfail: 2555 self.sqdata.outrightfail.remove(tid) 2556 if tid in self.scenequeue_notcovered: 2557 self.scenequeue_notcovered.remove(tid) 2558 if tid in self.scenequeue_covered: 2559 self.scenequeue_covered.remove(tid) 2560 if tid in self.scenequeue_notneeded: 2561 self.scenequeue_notneeded.remove(tid) 2562 2563 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2564 self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2565 2566 if tid in self.stampcache: 2567 del self.stampcache[tid] 2568 2569 if tid in self.build_stamps: 2570 del self.build_stamps[tid] 2571 2572 update_tasks.append(tid) 2573 2574 update_tasks2 = [] 2575 for tid in update_tasks: 2576 harddepfail = False 2577 for t in self.sqdata.sq_harddeps: 2578 if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered: 2579 harddepfail = True 2580 break 2581 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2582 if tid not in self.sq_buildable: 2583 self.sq_buildable.add(tid) 2584 if not self.sqdata.sq_revdeps[tid]: 2585 self.sq_buildable.add(tid) 2586 2587 update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid)) 2588 2589 if update_tasks2: 2590 self.sqdone = False 2591 for mc in sorted(self.sqdata.multiconfigs): 2592 for tid in sorted([t[0] for t in update_tasks2]): 2593 if mc_from_tid(tid) != mc: 2594 continue 2595 h = pending_hash_index(tid, self.rqdata) 2596 if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: 2597 self.sq_deferred[tid] = self.sqdata.hashes[h] 2598 bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) 2599 update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2600 2601 for (tid, harddepfail, origvalid) in update_tasks2: 2602 if tid in self.sqdata.valid and not origvalid: 2603 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2604 if harddepfail: 2605 logger.debug2("%s has an unavailable hard dependency so skipping" % (tid)) 2606 self.sq_task_failoutright(tid) 2607 2608 if changed: 2609 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2610 self.holdoff_need_update = True 2611 2612 def scenequeue_updatecounters(self, task, fail=False): 2613 2614 for dep in sorted(self.sqdata.sq_deps[task]): 2615 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: 2616 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2617 # dependency could be already processed, e.g. noexec setscene task 2618 continue 2619 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2620 if noexec or stamppresent: 2621 continue 2622 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2623 self.sq_task_failoutright(dep) 2624 continue 2625 if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2626 if dep not in self.sq_buildable: 2627 self.sq_buildable.add(dep) 2628 2629 next = set([task]) 2630 while next: 2631 new = set() 2632 for t in sorted(next): 2633 self.tasks_scenequeue_done.add(t) 2634 # Look down the dependency chain for non-setscene things which this task depends on 2635 # and mark as 'done' 2636 for dep in self.rqdata.runtaskentries[t].depends: 2637 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2638 continue 2639 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2640 new.add(dep) 2641 next = new 2642 2643 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2644 self.holdoff_need_update = True 2645 2646 def sq_task_completeoutright(self, task): 2647 """ 2648 Mark a task as completed 2649 Look at the reverse dependencies and mark any task with 2650 completed dependencies as buildable 2651 """ 2652 2653 logger.debug('Found task %s which could be accelerated', task) 2654 self.scenequeue_covered.add(task) 2655 self.scenequeue_updatecounters(task) 2656 2657 def sq_check_taskfail(self, task): 2658 if self.rqdata.setscene_ignore_tasks is not None: 2659 realtask = task.split('_setscene')[0] 2660 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2661 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2662 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2663 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2664 self.rq.state = runQueueCleanUp 2665 2666 def sq_task_complete(self, task): 2667 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2668 self.sq_task_completeoutright(task) 2669 2670 def sq_task_fail(self, task, result): 2671 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) 2672 self.scenequeue_notcovered.add(task) 2673 self.scenequeue_updatecounters(task, True) 2674 self.sq_check_taskfail(task) 2675 2676 def sq_task_failoutright(self, task): 2677 self.sq_running.add(task) 2678 self.sq_buildable.add(task) 2679 self.scenequeue_notcovered.add(task) 2680 self.scenequeue_updatecounters(task, True) 2681 2682 def sq_task_skip(self, task): 2683 self.sq_running.add(task) 2684 self.sq_buildable.add(task) 2685 self.sq_task_completeoutright(task) 2686 2687 def sq_build_taskdepdata(self, task): 2688 def getsetscenedeps(tid): 2689 deps = set() 2690 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2691 realtid = tid + "_setscene" 2692 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2693 for (depname, idependtask) in idepends: 2694 if depname not in self.rqdata.taskData[mc].build_targets: 2695 continue 2696 2697 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2698 if depfn is None: 2699 continue 2700 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2701 deps.add(deptid) 2702 return deps 2703 2704 taskdepdata = {} 2705 next = getsetscenedeps(task) 2706 next.add(task) 2707 while next: 2708 additional = [] 2709 for revdep in next: 2710 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2711 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2712 deps = getsetscenedeps(revdep) 2713 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2714 taskhash = self.rqdata.runtaskentries[revdep].hash 2715 unihash = self.rqdata.runtaskentries[revdep].unihash 2716 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] 2717 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] 2718 for revdep2 in deps: 2719 if revdep2 not in taskdepdata: 2720 additional.append(revdep2) 2721 next = additional 2722 2723 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2724 return taskdepdata 2725 2726 def check_setscene_ignore_tasks(self, tid): 2727 # Check task that is going to run against the ignore tasks list 2728 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2729 # Ignore covered tasks 2730 if tid in self.tasks_covered: 2731 return False 2732 # Ignore stamped tasks 2733 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2734 return False 2735 # Ignore noexec tasks 2736 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2737 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2738 return False 2739 2740 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2741 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2742 if tid in self.rqdata.runq_setscene_tids: 2743 msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] 2744 else: 2745 msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] 2746 for t in self.scenequeue_notcovered: 2747 msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) 2748 msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2749 logger.error("".join(msg)) 2750 return True 2751 return False 2752 2753class SQData(object): 2754 def __init__(self): 2755 # SceneQueue dependencies 2756 self.sq_deps = {} 2757 # SceneQueue reverse dependencies 2758 self.sq_revdeps = {} 2759 # Injected inter-setscene task dependencies 2760 self.sq_harddeps = {} 2761 # Cache of stamp files so duplicates can't run in parallel 2762 self.stamps = {} 2763 # Setscene tasks directly depended upon by the build 2764 self.unskippable = set() 2765 # List of setscene tasks which aren't present 2766 self.outrightfail = set() 2767 # A list of normal tasks a setscene task covers 2768 self.sq_covered_tasks = {} 2769 2770def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): 2771 2772 sq_revdeps = {} 2773 sq_revdeps_squash = {} 2774 sq_collated_deps = {} 2775 2776 # We need to construct a dependency graph for the setscene functions. Intermediate 2777 # dependencies between the setscene tasks only complicate the code. This code 2778 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2779 # only containing the setscene functions. 2780 2781 rqdata.init_progress_reporter.next_stage() 2782 2783 # First process the chains up to the first setscene task. 2784 endpoints = {} 2785 for tid in rqdata.runtaskentries: 2786 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2787 sq_revdeps_squash[tid] = set() 2788 if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: 2789 #bb.warn("Added endpoint %s" % (tid)) 2790 endpoints[tid] = set() 2791 2792 rqdata.init_progress_reporter.next_stage() 2793 2794 # Secondly process the chains between setscene tasks. 2795 for tid in rqdata.runq_setscene_tids: 2796 sq_collated_deps[tid] = set() 2797 #bb.warn("Added endpoint 2 %s" % (tid)) 2798 for dep in rqdata.runtaskentries[tid].depends: 2799 if tid in sq_revdeps[dep]: 2800 sq_revdeps[dep].remove(tid) 2801 if dep not in endpoints: 2802 endpoints[dep] = set() 2803 #bb.warn(" Added endpoint 3 %s" % (dep)) 2804 endpoints[dep].add(tid) 2805 2806 rqdata.init_progress_reporter.next_stage() 2807 2808 def process_endpoints(endpoints): 2809 newendpoints = {} 2810 for point, task in endpoints.items(): 2811 tasks = set() 2812 if task: 2813 tasks |= task 2814 if sq_revdeps_squash[point]: 2815 tasks |= sq_revdeps_squash[point] 2816 if point not in rqdata.runq_setscene_tids: 2817 for t in tasks: 2818 sq_collated_deps[t].add(point) 2819 sq_revdeps_squash[point] = set() 2820 if point in rqdata.runq_setscene_tids: 2821 sq_revdeps_squash[point] = tasks 2822 continue 2823 for dep in rqdata.runtaskentries[point].depends: 2824 if point in sq_revdeps[dep]: 2825 sq_revdeps[dep].remove(point) 2826 if tasks: 2827 sq_revdeps_squash[dep] |= tasks 2828 if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: 2829 newendpoints[dep] = task 2830 if newendpoints: 2831 process_endpoints(newendpoints) 2832 2833 process_endpoints(endpoints) 2834 2835 rqdata.init_progress_reporter.next_stage() 2836 2837 # Build a list of tasks which are "unskippable" 2838 # These are direct endpoints referenced by the build upto and including setscene tasks 2839 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2840 new = True 2841 for tid in rqdata.runtaskentries: 2842 if not rqdata.runtaskentries[tid].revdeps: 2843 sqdata.unskippable.add(tid) 2844 sqdata.unskippable |= sqrq.cantskip 2845 while new: 2846 new = False 2847 orig = sqdata.unskippable.copy() 2848 for tid in sorted(orig, reverse=True): 2849 if tid in rqdata.runq_setscene_tids: 2850 continue 2851 if not rqdata.runtaskentries[tid].depends: 2852 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2853 sqrq.setbuildable(tid) 2854 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2855 if sqdata.unskippable != orig: 2856 new = True 2857 2858 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 2859 2860 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2861 2862 # Sanity check all dependencies could be changed to setscene task references 2863 for taskcounter, tid in enumerate(rqdata.runtaskentries): 2864 if tid in rqdata.runq_setscene_tids: 2865 pass 2866 elif sq_revdeps_squash[tid]: 2867 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") 2868 else: 2869 del sq_revdeps_squash[tid] 2870 rqdata.init_progress_reporter.update(taskcounter) 2871 2872 rqdata.init_progress_reporter.next_stage() 2873 2874 # Resolve setscene inter-task dependencies 2875 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 2876 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 2877 for tid in rqdata.runq_setscene_tids: 2878 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2879 realtid = tid + "_setscene" 2880 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 2881 sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2882 2883 for (depname, idependtask) in idepends: 2884 2885 if depname not in rqdata.taskData[mc].build_targets: 2886 continue 2887 2888 depfn = rqdata.taskData[mc].build_targets[depname][0] 2889 if depfn is None: 2890 continue 2891 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2892 if deptid not in rqdata.runtaskentries: 2893 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 2894 2895 if not deptid in sqdata.sq_harddeps: 2896 sqdata.sq_harddeps[deptid] = set() 2897 sqdata.sq_harddeps[deptid].add(tid) 2898 2899 sq_revdeps_squash[tid].add(deptid) 2900 # Have to zero this to avoid circular dependencies 2901 sq_revdeps_squash[deptid] = set() 2902 2903 rqdata.init_progress_reporter.next_stage() 2904 2905 for task in sqdata.sq_harddeps: 2906 for dep in sqdata.sq_harddeps[task]: 2907 sq_revdeps_squash[dep].add(task) 2908 2909 rqdata.init_progress_reporter.next_stage() 2910 2911 #for tid in sq_revdeps_squash: 2912 # data = "" 2913 # for dep in sq_revdeps_squash[tid]: 2914 # data = data + "\n %s" % dep 2915 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 2916 2917 sqdata.sq_revdeps = sq_revdeps_squash 2918 sqdata.sq_covered_tasks = sq_collated_deps 2919 2920 # Build reverse version of revdeps to populate deps structure 2921 for tid in sqdata.sq_revdeps: 2922 sqdata.sq_deps[tid] = set() 2923 for tid in sqdata.sq_revdeps: 2924 for dep in sqdata.sq_revdeps[tid]: 2925 sqdata.sq_deps[dep].add(tid) 2926 2927 rqdata.init_progress_reporter.next_stage() 2928 2929 sqdata.multiconfigs = set() 2930 for tid in sqdata.sq_revdeps: 2931 sqdata.multiconfigs.add(mc_from_tid(tid)) 2932 if not sqdata.sq_revdeps[tid]: 2933 sqrq.sq_buildable.add(tid) 2934 2935 rqdata.init_progress_reporter.finish() 2936 2937 sqdata.noexec = set() 2938 sqdata.stamppresent = set() 2939 sqdata.valid = set() 2940 2941 sqdata.hashes = {} 2942 sqrq.sq_deferred = {} 2943 for mc in sorted(sqdata.multiconfigs): 2944 for tid in sorted(sqdata.sq_revdeps): 2945 if mc_from_tid(tid) != mc: 2946 continue 2947 h = pending_hash_index(tid, rqdata) 2948 if h not in sqdata.hashes: 2949 sqdata.hashes[h] = tid 2950 else: 2951 sqrq.sq_deferred[tid] = sqdata.hashes[h] 2952 bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) 2953 2954 update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) 2955 2956 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 2957 # in any stamp files. Pass the list out to metadata as an event. 2958 found = {} 2959 for tid in rqdata.runq_setscene_tids: 2960 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2961 stamps = bb.build.find_stale_stamps(taskname, taskfn) 2962 if stamps: 2963 if mc not in found: 2964 found[mc] = {} 2965 found[mc][tid] = stamps 2966 for mc in found: 2967 event = bb.event.StaleSetSceneTasks(found[mc]) 2968 bb.event.fire(event, cooker.databuilder.mcdata[mc]) 2969 2970def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 2971 2972 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2973 2974 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 2975 2976 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2977 bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn) 2978 return True, False 2979 2980 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 2981 logger.debug2('Setscene stamp current for task %s', tid) 2982 return False, True 2983 2984 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 2985 logger.debug2('Normal stamp current for task %s', tid) 2986 return False, True 2987 2988 return False, False 2989 2990def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 2991 2992 tocheck = set() 2993 2994 for tid in sorted(tids): 2995 if tid in sqdata.stamppresent: 2996 sqdata.stamppresent.remove(tid) 2997 if tid in sqdata.valid: 2998 sqdata.valid.remove(tid) 2999 if tid in sqdata.outrightfail: 3000 sqdata.outrightfail.remove(tid) 3001 3002 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 3003 3004 if noexec: 3005 sqdata.noexec.add(tid) 3006 sqrq.sq_task_skip(tid) 3007 logger.debug2("%s is noexec so skipping setscene" % (tid)) 3008 continue 3009 3010 if stamppresent: 3011 sqdata.stamppresent.add(tid) 3012 sqrq.sq_task_skip(tid) 3013 logger.debug2("%s has a valid stamp, skipping" % (tid)) 3014 continue 3015 3016 tocheck.add(tid) 3017 3018 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 3019 3020 for tid in tids: 3021 if tid in sqdata.stamppresent: 3022 continue 3023 if tid in sqdata.valid: 3024 continue 3025 if tid in sqdata.noexec: 3026 continue 3027 if tid in sqrq.scenequeue_covered: 3028 continue 3029 if tid in sqrq.scenequeue_notcovered: 3030 continue 3031 if tid in sqrq.sq_deferred: 3032 continue 3033 sqdata.outrightfail.add(tid) 3034 logger.debug2("%s already handled (fallthrough), skipping" % (tid)) 3035 3036class TaskFailure(Exception): 3037 """ 3038 Exception raised when a task in a runqueue fails 3039 """ 3040 def __init__(self, x): 3041 self.args = x 3042 3043 3044class runQueueExitWait(bb.event.Event): 3045 """ 3046 Event when waiting for task processes to exit 3047 """ 3048 3049 def __init__(self, remain): 3050 self.remain = remain 3051 self.message = "Waiting for %s active tasks to finish" % remain 3052 bb.event.Event.__init__(self) 3053 3054class runQueueEvent(bb.event.Event): 3055 """ 3056 Base runQueue event class 3057 """ 3058 def __init__(self, task, stats, rq): 3059 self.taskid = task 3060 self.taskstring = task 3061 self.taskname = taskname_from_tid(task) 3062 self.taskfile = fn_from_tid(task) 3063 self.taskhash = rq.rqdata.get_task_hash(task) 3064 self.stats = stats.copy() 3065 bb.event.Event.__init__(self) 3066 3067class sceneQueueEvent(runQueueEvent): 3068 """ 3069 Base sceneQueue event class 3070 """ 3071 def __init__(self, task, stats, rq, noexec=False): 3072 runQueueEvent.__init__(self, task, stats, rq) 3073 self.taskstring = task + "_setscene" 3074 self.taskname = taskname_from_tid(task) + "_setscene" 3075 self.taskfile = fn_from_tid(task) 3076 self.taskhash = rq.rqdata.get_task_hash(task) 3077 3078class runQueueTaskStarted(runQueueEvent): 3079 """ 3080 Event notifying a task was started 3081 """ 3082 def __init__(self, task, stats, rq, noexec=False): 3083 runQueueEvent.__init__(self, task, stats, rq) 3084 self.noexec = noexec 3085 3086class sceneQueueTaskStarted(sceneQueueEvent): 3087 """ 3088 Event notifying a setscene task was started 3089 """ 3090 def __init__(self, task, stats, rq, noexec=False): 3091 sceneQueueEvent.__init__(self, task, stats, rq) 3092 self.noexec = noexec 3093 3094class runQueueTaskFailed(runQueueEvent): 3095 """ 3096 Event notifying a task failed 3097 """ 3098 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 3099 runQueueEvent.__init__(self, task, stats, rq) 3100 self.exitcode = exitcode 3101 self.fakeroot_log = fakeroot_log 3102 3103 def __str__(self): 3104 if self.fakeroot_log: 3105 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 3106 else: 3107 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 3108 3109class sceneQueueTaskFailed(sceneQueueEvent): 3110 """ 3111 Event notifying a setscene task failed 3112 """ 3113 def __init__(self, task, stats, exitcode, rq): 3114 sceneQueueEvent.__init__(self, task, stats, rq) 3115 self.exitcode = exitcode 3116 3117 def __str__(self): 3118 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 3119 3120class sceneQueueComplete(sceneQueueEvent): 3121 """ 3122 Event when all the sceneQueue tasks are complete 3123 """ 3124 def __init__(self, stats, rq): 3125 self.stats = stats.copy() 3126 bb.event.Event.__init__(self) 3127 3128class runQueueTaskCompleted(runQueueEvent): 3129 """ 3130 Event notifying a task completed 3131 """ 3132 3133class sceneQueueTaskCompleted(sceneQueueEvent): 3134 """ 3135 Event notifying a setscene task completed 3136 """ 3137 3138class runQueueTaskSkipped(runQueueEvent): 3139 """ 3140 Event notifying a task was skipped 3141 """ 3142 def __init__(self, task, stats, rq, reason): 3143 runQueueEvent.__init__(self, task, stats, rq) 3144 self.reason = reason 3145 3146class taskUniHashUpdate(bb.event.Event): 3147 """ 3148 Base runQueue event class 3149 """ 3150 def __init__(self, task, unihash): 3151 self.taskid = task 3152 self.unihash = unihash 3153 bb.event.Event.__init__(self) 3154 3155class runQueuePipe(): 3156 """ 3157 Abstraction for a pipe between a worker thread and the server 3158 """ 3159 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3160 self.input = pipein 3161 if pipeout: 3162 pipeout.close() 3163 bb.utils.nonblockingfd(self.input) 3164 self.queue = bytearray() 3165 self.d = d 3166 self.rq = rq 3167 self.rqexec = rqexec 3168 self.fakerootlogs = fakerootlogs 3169 3170 def setrunqueueexec(self, rqexec): 3171 self.rqexec = rqexec 3172 3173 def read(self): 3174 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3175 for worker in workers.values(): 3176 worker.process.poll() 3177 if worker.process.returncode is not None and not self.rq.teardown: 3178 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3179 self.rq.finish_runqueue(True) 3180 3181 start = len(self.queue) 3182 try: 3183 self.queue.extend(self.input.read(102400) or b"") 3184 except (OSError, IOError) as e: 3185 if e.errno != errno.EAGAIN: 3186 raise 3187 end = len(self.queue) 3188 found = True 3189 while found and self.queue: 3190 found = False 3191 index = self.queue.find(b"</event>") 3192 while index != -1 and self.queue.startswith(b"<event>"): 3193 try: 3194 event = pickle.loads(self.queue[7:index]) 3195 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3196 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3197 # The pickled data could contain "</event>" so search for the next occurance 3198 # unpickling again, this should be the only way an unpickle error could occur 3199 index = self.queue.find(b"</event>", index + 1) 3200 continue 3201 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3202 bb.event.fire_from_worker(event, self.d) 3203 if isinstance(event, taskUniHashUpdate): 3204 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3205 found = True 3206 self.queue = self.queue[index+8:] 3207 index = self.queue.find(b"</event>") 3208 index = self.queue.find(b"</exitcode>") 3209 while index != -1 and self.queue.startswith(b"<exitcode>"): 3210 try: 3211 task, status = pickle.loads(self.queue[10:index]) 3212 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3213 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3214 (_, _, _, taskfn) = split_tid_mcfn(task) 3215 fakerootlog = None 3216 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3217 fakerootlog = self.fakerootlogs[taskfn] 3218 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3219 found = True 3220 self.queue = self.queue[index+11:] 3221 index = self.queue.find(b"</exitcode>") 3222 return (end > start) 3223 3224 def close(self): 3225 while self.read(): 3226 continue 3227 if self.queue: 3228 print("Warning, worker left partial message: %s" % self.queue) 3229 self.input.close() 3230 3231def get_setscene_enforce_ignore_tasks(d, targets): 3232 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3233 return None 3234 ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() 3235 outlist = [] 3236 for item in ignore_tasks[:]: 3237 if item.startswith('%:'): 3238 for (mc, target, task, fn) in targets: 3239 outlist.append(target + ':' + item.split(':')[1]) 3240 else: 3241 outlist.append(item) 3242 return outlist 3243 3244def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): 3245 import fnmatch 3246 if ignore_tasks is not None: 3247 item = '%s:%s' % (pn, taskname) 3248 for ignore_tasks in ignore_tasks: 3249 if fnmatch.fnmatch(item, ignore_tasks): 3250 return True 3251 return False 3252 return True 3253