1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import itertools 18import logging 19import re 20import bb 21from bb import msg, event 22from bb import monitordisk 23import subprocess 24import pickle 25from multiprocessing import Process 26import shlex 27import pprint 28import time 29 30bblogger = logging.getLogger("BitBake") 31logger = logging.getLogger("BitBake.RunQueue") 32hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 33 34__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 35 36def fn_from_tid(tid): 37 return tid.rsplit(":", 1)[0] 38 39def taskname_from_tid(tid): 40 return tid.rsplit(":", 1)[1] 41 42def mc_from_tid(tid): 43 if tid.startswith('mc:') and tid.count(':') >= 2: 44 return tid.split(':')[1] 45 return "" 46 47def split_tid(tid): 48 (mc, fn, taskname, _) = split_tid_mcfn(tid) 49 return (mc, fn, taskname) 50 51def split_mc(n): 52 if n.startswith("mc:") and n.count(':') >= 2: 53 _, mc, n = n.split(":", 2) 54 return (mc, n) 55 return ('', n) 56 57def split_tid_mcfn(tid): 58 if tid.startswith('mc:') and tid.count(':') >= 2: 59 elems = tid.split(':') 60 mc = elems[1] 61 fn = ":".join(elems[2:-1]) 62 taskname = elems[-1] 63 mcfn = "mc:" + mc + ":" + fn 64 else: 65 tid = tid.rsplit(":", 1) 66 mc = "" 67 fn = tid[0] 68 taskname = tid[1] 69 mcfn = fn 70 71 return (mc, fn, taskname, mcfn) 72 73def build_tid(mc, fn, taskname): 74 if mc: 75 return "mc:" + mc + ":" + fn + ":" + taskname 76 return fn + ":" + taskname 77 78# Index used to pair up potentially matching multiconfig tasks 79# We match on PN, taskname and hash being equal 80def pending_hash_index(tid, rqdata): 81 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 82 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 83 h = rqdata.runtaskentries[tid].unihash 84 return pn + ":" + "taskname" + h 85 86class RunQueueStats: 87 """ 88 Holds statistics on the tasks handled by the associated runQueue 89 """ 90 def __init__(self, total, setscene_total): 91 self.completed = 0 92 self.skipped = 0 93 self.failed = 0 94 self.active = 0 95 self.setscene_active = 0 96 self.setscene_covered = 0 97 self.setscene_notcovered = 0 98 self.setscene_total = setscene_total 99 self.total = total 100 101 def copy(self): 102 obj = self.__class__(self.total, self.setscene_total) 103 obj.__dict__.update(self.__dict__) 104 return obj 105 106 def taskFailed(self): 107 self.active = self.active - 1 108 self.failed = self.failed + 1 109 110 def taskCompleted(self): 111 self.active = self.active - 1 112 self.completed = self.completed + 1 113 114 def taskSkipped(self): 115 self.active = self.active + 1 116 self.skipped = self.skipped + 1 117 118 def taskActive(self): 119 self.active = self.active + 1 120 121 def updateCovered(self, covered, notcovered): 122 self.setscene_covered = covered 123 self.setscene_notcovered = notcovered 124 125 def updateActiveSetscene(self, active): 126 self.setscene_active = active 127 128# These values indicate the next step due to be run in the 129# runQueue state machine 130runQueuePrepare = 2 131runQueueSceneInit = 3 132runQueueDumpSigs = 4 133runQueueRunning = 6 134runQueueFailed = 7 135runQueueCleanUp = 8 136runQueueComplete = 9 137 138class RunQueueScheduler(object): 139 """ 140 Control the order tasks are scheduled in. 141 """ 142 name = "basic" 143 144 def __init__(self, runqueue, rqdata): 145 """ 146 The default scheduler just returns the first buildable task (the 147 priority map is sorted by task number) 148 """ 149 self.rq = runqueue 150 self.rqdata = rqdata 151 self.numTasks = len(self.rqdata.runtaskentries) 152 153 self.prio_map = [self.rqdata.runtaskentries.keys()] 154 155 self.buildable = set() 156 self.skip_maxthread = {} 157 self.stamps = {} 158 for tid in self.rqdata.runtaskentries: 159 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 160 self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 161 if tid in self.rq.runq_buildable: 162 self.buildable.add(tid) 163 164 self.rev_prio_map = None 165 self.is_pressure_usable() 166 167 def is_pressure_usable(self): 168 """ 169 If monitoring pressure, return True if pressure files can be open and read. For example 170 openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) 171 is returned. 172 """ 173 if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: 174 try: 175 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 176 open("/proc/pressure/io") as io_pressure_fds, \ 177 open("/proc/pressure/memory") as memory_pressure_fds: 178 179 self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 180 self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 181 self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 182 self.prev_pressure_time = time.time() 183 self.check_pressure = True 184 except: 185 bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") 186 self.check_pressure = False 187 else: 188 self.check_pressure = False 189 190 def exceeds_max_pressure(self): 191 """ 192 Monitor the difference in total pressure at least once per second, if 193 BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. 194 """ 195 if self.check_pressure: 196 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 197 open("/proc/pressure/io") as io_pressure_fds, \ 198 open("/proc/pressure/memory") as memory_pressure_fds: 199 # extract "total" from /proc/pressure/{cpu|io} 200 curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 201 curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 202 curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 203 now = time.time() 204 tdiff = now - self.prev_pressure_time 205 psi_accumulation_interval = 1.0 206 cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff 207 io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff 208 memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff 209 exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure 210 exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure 211 exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure 212 213 if tdiff > psi_accumulation_interval: 214 self.prev_cpu_pressure = curr_cpu_pressure 215 self.prev_io_pressure = curr_io_pressure 216 self.prev_memory_pressure = curr_memory_pressure 217 self.prev_pressure_time = now 218 219 pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure) 220 pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure) 221 if hasattr(self, "pressure_state") and pressure_state != self.pressure_state: 222 bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))) 223 self.pressure_state = pressure_state 224 return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) 225 elif self.rq.max_loadfactor: 226 limit = False 227 loadfactor = float(os.getloadavg()[0]) / os.cpu_count() 228 # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor)) 229 if loadfactor > self.rq.max_loadfactor: 230 limit = True 231 if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit: 232 bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)) 233 self.loadfactor_limit = limit 234 return limit 235 return False 236 237 def next_buildable_task(self): 238 """ 239 Return the id of the first task we find that is buildable 240 """ 241 # Once tasks are running we don't need to worry about them again 242 self.buildable.difference_update(self.rq.runq_running) 243 buildable = set(self.buildable) 244 buildable.difference_update(self.rq.holdoff_tasks) 245 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 246 if not buildable: 247 return None 248 249 # Bitbake requires that at least one task be active. Only check for pressure if 250 # this is the case, otherwise the pressure limitation could result in no tasks 251 # being active and no new tasks started thereby, at times, breaking the scheduler. 252 if self.rq.stats.active and self.exceeds_max_pressure(): 253 return None 254 255 # Filter out tasks that have a max number of threads that have been exceeded 256 skip_buildable = {} 257 for running in self.rq.runq_running.difference(self.rq.runq_complete): 258 rtaskname = taskname_from_tid(running) 259 if rtaskname not in self.skip_maxthread: 260 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 261 if not self.skip_maxthread[rtaskname]: 262 continue 263 if rtaskname in skip_buildable: 264 skip_buildable[rtaskname] += 1 265 else: 266 skip_buildable[rtaskname] = 1 267 268 if len(buildable) == 1: 269 tid = buildable.pop() 270 taskname = taskname_from_tid(tid) 271 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 272 return None 273 stamp = self.stamps[tid] 274 if stamp not in self.rq.build_stamps.values(): 275 return tid 276 277 if not self.rev_prio_map: 278 self.rev_prio_map = {} 279 for tid in self.rqdata.runtaskentries: 280 self.rev_prio_map[tid] = self.prio_map.index(tid) 281 282 best = None 283 bestprio = None 284 for tid in buildable: 285 prio = self.rev_prio_map[tid] 286 if bestprio is None or bestprio > prio: 287 taskname = taskname_from_tid(tid) 288 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 289 continue 290 stamp = self.stamps[tid] 291 if stamp in self.rq.build_stamps.values(): 292 continue 293 bestprio = prio 294 best = tid 295 296 return best 297 298 def next(self): 299 """ 300 Return the id of the task we should build next 301 """ 302 if self.rq.can_start_task(): 303 return self.next_buildable_task() 304 305 def newbuildable(self, task): 306 self.buildable.add(task) 307 308 def removebuildable(self, task): 309 self.buildable.remove(task) 310 311 def describe_task(self, taskid): 312 result = 'ID %s' % taskid 313 if self.rev_prio_map: 314 result = result + (' pri %d' % self.rev_prio_map[taskid]) 315 return result 316 317 def dump_prio(self, comment): 318 bb.debug(3, '%s (most important first):\n%s' % 319 (comment, 320 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 321 index, taskid in enumerate(self.prio_map)]))) 322 323class RunQueueSchedulerSpeed(RunQueueScheduler): 324 """ 325 A scheduler optimised for speed. The priority map is sorted by task weight, 326 heavier weighted tasks (tasks needed by the most other tasks) are run first. 327 """ 328 name = "speed" 329 330 def __init__(self, runqueue, rqdata): 331 """ 332 The priority map is sorted by task weight. 333 """ 334 RunQueueScheduler.__init__(self, runqueue, rqdata) 335 336 weights = {} 337 for tid in self.rqdata.runtaskentries: 338 weight = self.rqdata.runtaskentries[tid].weight 339 if not weight in weights: 340 weights[weight] = [] 341 weights[weight].append(tid) 342 343 self.prio_map = [] 344 for weight in sorted(weights): 345 for w in weights[weight]: 346 self.prio_map.append(w) 347 348 self.prio_map.reverse() 349 350class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 351 """ 352 A scheduler optimised to complete .bb files as quickly as possible. The 353 priority map is sorted by task weight, but then reordered so once a given 354 .bb file starts to build, it's completed as quickly as possible by 355 running all tasks related to the same .bb file one after the after. 356 This works well where disk space is at a premium and classes like OE's 357 rm_work are in force. 358 """ 359 name = "completion" 360 361 def __init__(self, runqueue, rqdata): 362 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 363 364 # Extract list of tasks for each recipe, with tasks sorted 365 # ascending from "must run first" (typically do_fetch) to 366 # "runs last" (do_build). The speed scheduler prioritizes 367 # tasks that must run first before the ones that run later; 368 # this is what we depend on here. 369 task_lists = {} 370 for taskid in self.prio_map: 371 fn, taskname = taskid.rsplit(':', 1) 372 task_lists.setdefault(fn, []).append(taskname) 373 374 # Now unify the different task lists. The strategy is that 375 # common tasks get skipped and new ones get inserted after the 376 # preceeding common one(s) as they are found. Because task 377 # lists should differ only by their number of tasks, but not 378 # the ordering of the common tasks, this should result in a 379 # deterministic result that is a superset of the individual 380 # task ordering. 381 all_tasks = [] 382 for recipe, new_tasks in task_lists.items(): 383 index = 0 384 old_task = all_tasks[index] if index < len(all_tasks) else None 385 for new_task in new_tasks: 386 if old_task == new_task: 387 # Common task, skip it. This is the fast-path which 388 # avoids a full search. 389 index += 1 390 old_task = all_tasks[index] if index < len(all_tasks) else None 391 else: 392 try: 393 index = all_tasks.index(new_task) 394 # Already present, just not at the current 395 # place. We re-synchronized by changing the 396 # index so that it matches again. Now 397 # move on to the next existing task. 398 index += 1 399 old_task = all_tasks[index] if index < len(all_tasks) else None 400 except ValueError: 401 # Not present. Insert before old_task, which 402 # remains the same (but gets shifted back). 403 all_tasks.insert(index, new_task) 404 index += 1 405 bb.debug(3, 'merged task list: %s' % all_tasks) 406 407 # Now reverse the order so that tasks that finish the work on one 408 # recipe are considered more imporant (= come first). The ordering 409 # is now so that do_build is most important. 410 all_tasks.reverse() 411 412 # Group tasks of the same kind before tasks of less important 413 # kinds at the head of the queue (because earlier = lower 414 # priority number = runs earlier), while preserving the 415 # ordering by recipe. If recipe foo is more important than 416 # bar, then the goal is to work on foo's do_populate_sysroot 417 # before bar's do_populate_sysroot and on the more important 418 # tasks of foo before any of the less important tasks in any 419 # other recipe (if those other recipes are more important than 420 # foo). 421 # 422 # All of this only applies when tasks are runable. Explicit 423 # dependencies still override this ordering by priority. 424 # 425 # Here's an example why this priority re-ordering helps with 426 # minimizing disk usage. Consider a recipe foo with a higher 427 # priority than bar where foo DEPENDS on bar. Then the 428 # implicit rule (from base.bbclass) is that foo's do_configure 429 # depends on bar's do_populate_sysroot. This ensures that 430 # bar's do_populate_sysroot gets done first. Normally the 431 # tasks from foo would continue to run once that is done, and 432 # bar only gets completed and cleaned up later. By ordering 433 # bar's task that depend on bar's do_populate_sysroot before foo's 434 # do_configure, that problem gets avoided. 435 task_index = 0 436 self.dump_prio('original priorities') 437 for task in all_tasks: 438 for index in range(task_index, self.numTasks): 439 taskid = self.prio_map[index] 440 taskname = taskid.rsplit(':', 1)[1] 441 if taskname == task: 442 del self.prio_map[index] 443 self.prio_map.insert(task_index, taskid) 444 task_index += 1 445 self.dump_prio('completion priorities') 446 447class RunTaskEntry(object): 448 def __init__(self): 449 self.depends = set() 450 self.revdeps = set() 451 self.hash = None 452 self.unihash = None 453 self.task = None 454 self.weight = 1 455 456class RunQueueData: 457 """ 458 BitBake Run Queue implementation 459 """ 460 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 461 self.cooker = cooker 462 self.dataCaches = dataCaches 463 self.taskData = taskData 464 self.targets = targets 465 self.rq = rq 466 self.warn_multi_bb = False 467 468 self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() 469 self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) 470 self.setscene_ignore_tasks_checked = False 471 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 472 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 473 474 self.reset() 475 476 def reset(self): 477 self.runtaskentries = {} 478 479 def runq_depends_names(self, ids): 480 ret = [] 481 for id in ids: 482 nam = os.path.basename(id) 483 nam = re.sub("_[^,]*,", ",", nam) 484 ret.extend([nam]) 485 return ret 486 487 def get_task_hash(self, tid): 488 return self.runtaskentries[tid].hash 489 490 def get_task_unihash(self, tid): 491 return self.runtaskentries[tid].unihash 492 493 def get_user_idstring(self, tid, task_name_suffix = ""): 494 return tid + task_name_suffix 495 496 def get_short_user_idstring(self, task, task_name_suffix = ""): 497 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 498 pn = self.dataCaches[mc].pkg_fn[taskfn] 499 taskname = taskname_from_tid(task) + task_name_suffix 500 return "%s:%s" % (pn, taskname) 501 502 def circular_depchains_handler(self, tasks): 503 """ 504 Some tasks aren't buildable, likely due to circular dependency issues. 505 Identify the circular dependencies and print them in a user readable format. 506 """ 507 from copy import deepcopy 508 509 valid_chains = [] 510 explored_deps = {} 511 msgs = [] 512 513 class TooManyLoops(Exception): 514 pass 515 516 def chain_reorder(chain): 517 """ 518 Reorder a dependency chain so the lowest task id is first 519 """ 520 lowest = 0 521 new_chain = [] 522 for entry in range(len(chain)): 523 if chain[entry] < chain[lowest]: 524 lowest = entry 525 new_chain.extend(chain[lowest:]) 526 new_chain.extend(chain[:lowest]) 527 return new_chain 528 529 def chain_compare_equal(chain1, chain2): 530 """ 531 Compare two dependency chains and see if they're the same 532 """ 533 if len(chain1) != len(chain2): 534 return False 535 for index in range(len(chain1)): 536 if chain1[index] != chain2[index]: 537 return False 538 return True 539 540 def chain_array_contains(chain, chain_array): 541 """ 542 Return True if chain_array contains chain 543 """ 544 for ch in chain_array: 545 if chain_compare_equal(ch, chain): 546 return True 547 return False 548 549 def find_chains(tid, prev_chain): 550 prev_chain.append(tid) 551 total_deps = [] 552 total_deps.extend(self.runtaskentries[tid].revdeps) 553 for revdep in self.runtaskentries[tid].revdeps: 554 if revdep in prev_chain: 555 idx = prev_chain.index(revdep) 556 # To prevent duplicates, reorder the chain to start with the lowest taskid 557 # and search through an array of those we've already printed 558 chain = prev_chain[idx:] 559 new_chain = chain_reorder(chain) 560 if not chain_array_contains(new_chain, valid_chains): 561 valid_chains.append(new_chain) 562 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 563 for dep in new_chain: 564 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 565 msgs.append("\n") 566 if len(valid_chains) > 10: 567 msgs.append("Halted dependency loops search after 10 matches.\n") 568 raise TooManyLoops 569 continue 570 scan = False 571 if revdep not in explored_deps: 572 scan = True 573 elif revdep in explored_deps[revdep]: 574 scan = True 575 else: 576 for dep in prev_chain: 577 if dep in explored_deps[revdep]: 578 scan = True 579 if scan: 580 find_chains(revdep, copy.deepcopy(prev_chain)) 581 for dep in explored_deps[revdep]: 582 if dep not in total_deps: 583 total_deps.append(dep) 584 585 explored_deps[tid] = total_deps 586 587 try: 588 for task in tasks: 589 find_chains(task, []) 590 except TooManyLoops: 591 pass 592 593 return msgs 594 595 def calculate_task_weights(self, endpoints): 596 """ 597 Calculate a number representing the "weight" of each task. Heavier weighted tasks 598 have more dependencies and hence should be executed sooner for maximum speed. 599 600 This function also sanity checks the task list finding tasks that are not 601 possible to execute due to circular dependencies. 602 """ 603 604 numTasks = len(self.runtaskentries) 605 weight = {} 606 deps_left = {} 607 task_done = {} 608 609 for tid in self.runtaskentries: 610 task_done[tid] = False 611 weight[tid] = 1 612 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 613 614 for tid in endpoints: 615 weight[tid] = 10 616 task_done[tid] = True 617 618 while True: 619 next_points = [] 620 for tid in endpoints: 621 for revdep in self.runtaskentries[tid].depends: 622 weight[revdep] = weight[revdep] + weight[tid] 623 deps_left[revdep] = deps_left[revdep] - 1 624 if deps_left[revdep] == 0: 625 next_points.append(revdep) 626 task_done[revdep] = True 627 endpoints = next_points 628 if not next_points: 629 break 630 631 # Circular dependency sanity check 632 problem_tasks = [] 633 for tid in self.runtaskentries: 634 if task_done[tid] is False or deps_left[tid] != 0: 635 problem_tasks.append(tid) 636 logger.debug2("Task %s is not buildable", tid) 637 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 638 self.runtaskentries[tid].weight = weight[tid] 639 640 if problem_tasks: 641 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 642 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 643 message = message + "Identifying dependency loops (this may take a short while)...\n" 644 logger.error(message) 645 646 msgs = self.circular_depchains_handler(problem_tasks) 647 648 message = "\n" 649 for msg in msgs: 650 message = message + msg 651 bb.msg.fatal("RunQueue", message) 652 653 return weight 654 655 def prepare(self): 656 """ 657 Turn a set of taskData into a RunQueue and compute data needed 658 to optimise the execution order. 659 """ 660 661 runq_build = {} 662 recursivetasks = {} 663 recursiveitasks = {} 664 recursivetasksselfref = set() 665 666 taskData = self.taskData 667 668 found = False 669 for mc in self.taskData: 670 if taskData[mc].taskentries: 671 found = True 672 break 673 if not found: 674 # Nothing to do 675 return 0 676 677 bb.parse.siggen.setup_datacache(self.dataCaches) 678 679 self.init_progress_reporter.start() 680 self.init_progress_reporter.next_stage() 681 bb.event.check_for_interrupts(self.cooker.data) 682 683 # Step A - Work out a list of tasks to run 684 # 685 # Taskdata gives us a list of possible providers for every build and run 686 # target ordered by priority. It also gives information on each of those 687 # providers. 688 # 689 # To create the actual list of tasks to execute we fix the list of 690 # providers and then resolve the dependencies into task IDs. This 691 # process is repeated for each type of dependency (tdepends, deptask, 692 # rdeptast, recrdeptask, idepends). 693 694 def add_build_dependencies(depids, tasknames, depends, mc): 695 for depname in depids: 696 # Won't be in build_targets if ASSUME_PROVIDED 697 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 698 continue 699 depdata = taskData[mc].build_targets[depname][0] 700 if depdata is None: 701 continue 702 for taskname in tasknames: 703 t = depdata + ":" + taskname 704 if t in taskData[mc].taskentries: 705 depends.add(t) 706 707 def add_runtime_dependencies(depids, tasknames, depends, mc): 708 for depname in depids: 709 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 710 continue 711 depdata = taskData[mc].run_targets[depname][0] 712 if depdata is None: 713 continue 714 for taskname in tasknames: 715 t = depdata + ":" + taskname 716 if t in taskData[mc].taskentries: 717 depends.add(t) 718 719 def add_mc_dependencies(mc, tid): 720 mcdeps = taskData[mc].get_mcdepends() 721 for dep in mcdeps: 722 mcdependency = dep.split(':') 723 pn = mcdependency[3] 724 frommc = mcdependency[1] 725 mcdep = mcdependency[2] 726 deptask = mcdependency[4] 727 if mcdep not in taskData: 728 bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep)) 729 if mc == frommc: 730 fn = taskData[mcdep].build_targets[pn][0] 731 newdep = '%s:%s' % (fn,deptask) 732 if newdep not in taskData[mcdep].taskentries: 733 bb.fatal("Task mcdepends on non-existent task %s" % (newdep)) 734 taskData[mc].taskentries[tid].tdepends.append(newdep) 735 736 for mc in taskData: 737 for tid in taskData[mc].taskentries: 738 739 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 740 #runtid = build_tid(mc, fn, taskname) 741 742 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 743 744 depends = set() 745 task_deps = self.dataCaches[mc].task_deps[taskfn] 746 747 self.runtaskentries[tid] = RunTaskEntry() 748 749 if fn in taskData[mc].failed_fns: 750 continue 751 752 # We add multiconfig dependencies before processing internal task deps (tdepends) 753 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 754 add_mc_dependencies(mc, tid) 755 756 # Resolve task internal dependencies 757 # 758 # e.g. addtask before X after Y 759 for t in taskData[mc].taskentries[tid].tdepends: 760 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 761 depends.add(build_tid(depmc, depfn, deptaskname)) 762 763 # Resolve 'deptask' dependencies 764 # 765 # e.g. do_sometask[deptask] = "do_someothertask" 766 # (makes sure sometask runs after someothertask of all DEPENDS) 767 if 'deptask' in task_deps and taskname in task_deps['deptask']: 768 tasknames = task_deps['deptask'][taskname].split() 769 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 770 771 # Resolve 'rdeptask' dependencies 772 # 773 # e.g. do_sometask[rdeptask] = "do_someothertask" 774 # (makes sure sometask runs after someothertask of all RDEPENDS) 775 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 776 tasknames = task_deps['rdeptask'][taskname].split() 777 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 778 779 # Resolve inter-task dependencies 780 # 781 # e.g. do_sometask[depends] = "targetname:do_someothertask" 782 # (makes sure sometask runs after targetname's someothertask) 783 idepends = taskData[mc].taskentries[tid].idepends 784 for (depname, idependtask) in idepends: 785 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 786 # Won't be in build_targets if ASSUME_PROVIDED 787 depdata = taskData[mc].build_targets[depname][0] 788 if depdata is not None: 789 t = depdata + ":" + idependtask 790 depends.add(t) 791 if t not in taskData[mc].taskentries: 792 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 793 irdepends = taskData[mc].taskentries[tid].irdepends 794 for (depname, idependtask) in irdepends: 795 if depname in taskData[mc].run_targets: 796 # Won't be in run_targets if ASSUME_PROVIDED 797 if not taskData[mc].run_targets[depname]: 798 continue 799 depdata = taskData[mc].run_targets[depname][0] 800 if depdata is not None: 801 t = depdata + ":" + idependtask 802 depends.add(t) 803 if t not in taskData[mc].taskentries: 804 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 805 806 # Resolve recursive 'recrdeptask' dependencies (Part A) 807 # 808 # e.g. do_sometask[recrdeptask] = "do_someothertask" 809 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 810 # We cover the recursive part of the dependencies below 811 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 812 tasknames = task_deps['recrdeptask'][taskname].split() 813 recursivetasks[tid] = tasknames 814 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 815 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 816 if taskname in tasknames: 817 recursivetasksselfref.add(tid) 818 819 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 820 recursiveitasks[tid] = [] 821 for t in task_deps['recideptask'][taskname].split(): 822 newdep = build_tid(mc, fn, t) 823 recursiveitasks[tid].append(newdep) 824 825 self.runtaskentries[tid].depends = depends 826 # Remove all self references 827 self.runtaskentries[tid].depends.discard(tid) 828 829 #self.dump_data() 830 831 self.init_progress_reporter.next_stage() 832 bb.event.check_for_interrupts(self.cooker.data) 833 834 # Resolve recursive 'recrdeptask' dependencies (Part B) 835 # 836 # e.g. do_sometask[recrdeptask] = "do_someothertask" 837 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 838 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 839 840 # Generating/interating recursive lists of dependencies is painful and potentially slow 841 # Precompute recursive task dependencies here by: 842 # a) create a temp list of reverse dependencies (revdeps) 843 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 844 # c) combine the total list of dependencies in cumulativedeps 845 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 846 847 848 revdeps = {} 849 deps = {} 850 cumulativedeps = {} 851 for tid in self.runtaskentries: 852 deps[tid] = set(self.runtaskentries[tid].depends) 853 revdeps[tid] = set() 854 cumulativedeps[tid] = set() 855 # Generate a temp list of reverse dependencies 856 for tid in self.runtaskentries: 857 for dep in self.runtaskentries[tid].depends: 858 revdeps[dep].add(tid) 859 # Find the dependency chain endpoints 860 endpoints = set() 861 for tid in self.runtaskentries: 862 if not deps[tid]: 863 endpoints.add(tid) 864 # Iterate the chains collating dependencies 865 while endpoints: 866 next = set() 867 for tid in endpoints: 868 for dep in revdeps[tid]: 869 cumulativedeps[dep].add(fn_from_tid(tid)) 870 cumulativedeps[dep].update(cumulativedeps[tid]) 871 if tid in deps[dep]: 872 deps[dep].remove(tid) 873 if not deps[dep]: 874 next.add(dep) 875 endpoints = next 876 #for tid in deps: 877 # if deps[tid]: 878 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 879 880 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 881 # resolve these recursively until we aren't adding any further extra dependencies 882 extradeps = True 883 while extradeps: 884 extradeps = 0 885 for tid in recursivetasks: 886 tasknames = recursivetasks[tid] 887 888 totaldeps = set(self.runtaskentries[tid].depends) 889 if tid in recursiveitasks: 890 totaldeps.update(recursiveitasks[tid]) 891 for dep in recursiveitasks[tid]: 892 if dep not in self.runtaskentries: 893 continue 894 totaldeps.update(self.runtaskentries[dep].depends) 895 896 deps = set() 897 for dep in totaldeps: 898 if dep in cumulativedeps: 899 deps.update(cumulativedeps[dep]) 900 901 for t in deps: 902 for taskname in tasknames: 903 newtid = t + ":" + taskname 904 if newtid == tid: 905 continue 906 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 907 extradeps += 1 908 self.runtaskentries[tid].depends.add(newtid) 909 910 # Handle recursive tasks which depend upon other recursive tasks 911 deps = set() 912 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 913 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 914 for newtid in deps: 915 for taskname in tasknames: 916 if not newtid.endswith(":" + taskname): 917 continue 918 if newtid in self.runtaskentries: 919 extradeps += 1 920 self.runtaskentries[tid].depends.add(newtid) 921 922 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 923 924 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 925 for tid in recursivetasksselfref: 926 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 927 928 self.init_progress_reporter.next_stage() 929 bb.event.check_for_interrupts(self.cooker.data) 930 931 #self.dump_data() 932 933 # Step B - Mark all active tasks 934 # 935 # Start with the tasks we were asked to run and mark all dependencies 936 # as active too. If the task is to be 'forced', clear its stamp. Once 937 # all active tasks are marked, prune the ones we don't need. 938 939 logger.verbose("Marking Active Tasks") 940 941 def mark_active(tid, depth): 942 """ 943 Mark an item as active along with its depends 944 (calls itself recursively) 945 """ 946 947 if tid in runq_build: 948 return 949 950 runq_build[tid] = 1 951 952 depends = self.runtaskentries[tid].depends 953 for depend in depends: 954 mark_active(depend, depth+1) 955 956 def invalidate_task(tid, error_nostamp): 957 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 958 taskdep = self.dataCaches[mc].task_deps[taskfn] 959 if fn + ":" + taskname not in taskData[mc].taskentries: 960 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 961 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 962 if error_nostamp: 963 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 964 else: 965 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 966 else: 967 logger.verbose("Invalidate task %s, %s", taskname, fn) 968 bb.parse.siggen.invalidate_task(taskname, taskfn) 969 970 self.target_tids = [] 971 for (mc, target, task, fn) in self.targets: 972 973 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 974 continue 975 976 if target in taskData[mc].failed_deps: 977 continue 978 979 parents = False 980 if task.endswith('-'): 981 parents = True 982 task = task[:-1] 983 984 if fn in taskData[mc].failed_fns: 985 continue 986 987 # fn already has mc prefix 988 tid = fn + ":" + task 989 self.target_tids.append(tid) 990 if tid not in taskData[mc].taskentries: 991 import difflib 992 tasks = [] 993 for x in taskData[mc].taskentries: 994 if x.startswith(fn + ":"): 995 tasks.append(taskname_from_tid(x)) 996 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 997 if close_matches: 998 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 999 else: 1000 extra = "" 1001 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 1002 1003 # For tasks called "XXXX-", ony run their dependencies 1004 if parents: 1005 for i in self.runtaskentries[tid].depends: 1006 mark_active(i, 1) 1007 else: 1008 mark_active(tid, 1) 1009 1010 self.init_progress_reporter.next_stage() 1011 bb.event.check_for_interrupts(self.cooker.data) 1012 1013 # Step C - Prune all inactive tasks 1014 # 1015 # Once all active tasks are marked, prune the ones we don't need. 1016 1017 # Handle --runall 1018 if self.cooker.configuration.runall: 1019 # re-run the mark_active and then drop unused tasks from new list 1020 1021 runall_tids = set() 1022 added = True 1023 while added: 1024 reduced_tasklist = set(self.runtaskentries.keys()) 1025 for tid in list(self.runtaskentries.keys()): 1026 if tid not in runq_build: 1027 reduced_tasklist.remove(tid) 1028 runq_build = {} 1029 1030 orig = runall_tids 1031 runall_tids = set() 1032 for task in self.cooker.configuration.runall: 1033 if not task.startswith("do_"): 1034 task = "do_{0}".format(task) 1035 for tid in reduced_tasklist: 1036 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 1037 if wanttid in self.runtaskentries: 1038 runall_tids.add(wanttid) 1039 1040 for tid in list(runall_tids): 1041 mark_active(tid, 1) 1042 self.target_tids.append(tid) 1043 if self.cooker.configuration.force: 1044 invalidate_task(tid, False) 1045 added = runall_tids - orig 1046 1047 delcount = set() 1048 for tid in list(self.runtaskentries.keys()): 1049 if tid not in runq_build: 1050 delcount.add(tid) 1051 del self.runtaskentries[tid] 1052 1053 if self.cooker.configuration.runall: 1054 if not self.runtaskentries: 1055 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 1056 1057 self.init_progress_reporter.next_stage() 1058 bb.event.check_for_interrupts(self.cooker.data) 1059 1060 # Handle runonly 1061 if self.cooker.configuration.runonly: 1062 # re-run the mark_active and then drop unused tasks from new list 1063 runq_build = {} 1064 1065 for task in self.cooker.configuration.runonly: 1066 if not task.startswith("do_"): 1067 task = "do_{0}".format(task) 1068 runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] 1069 1070 for tid in runonly_tids: 1071 mark_active(tid, 1) 1072 if self.cooker.configuration.force: 1073 invalidate_task(tid, False) 1074 1075 for tid in list(self.runtaskentries.keys()): 1076 if tid not in runq_build: 1077 delcount.add(tid) 1078 del self.runtaskentries[tid] 1079 1080 if not self.runtaskentries: 1081 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 1082 1083 # 1084 # Step D - Sanity checks and computation 1085 # 1086 1087 # Check to make sure we still have tasks to run 1088 if not self.runtaskentries: 1089 if not taskData[''].halt: 1090 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 1091 else: 1092 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 1093 1094 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 1095 1096 logger.verbose("Assign Weightings") 1097 1098 self.init_progress_reporter.next_stage() 1099 bb.event.check_for_interrupts(self.cooker.data) 1100 1101 # Generate a list of reverse dependencies to ease future calculations 1102 for tid in self.runtaskentries: 1103 for dep in self.runtaskentries[tid].depends: 1104 self.runtaskentries[dep].revdeps.add(tid) 1105 1106 self.init_progress_reporter.next_stage() 1107 bb.event.check_for_interrupts(self.cooker.data) 1108 1109 # Identify tasks at the end of dependency chains 1110 # Error on circular dependency loops (length two) 1111 endpoints = [] 1112 for tid in self.runtaskentries: 1113 revdeps = self.runtaskentries[tid].revdeps 1114 if not revdeps: 1115 endpoints.append(tid) 1116 for dep in revdeps: 1117 if dep in self.runtaskentries[tid].depends: 1118 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1119 1120 1121 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1122 1123 self.init_progress_reporter.next_stage() 1124 bb.event.check_for_interrupts(self.cooker.data) 1125 1126 # Calculate task weights 1127 # Check of higher length circular dependencies 1128 self.runq_weight = self.calculate_task_weights(endpoints) 1129 1130 self.init_progress_reporter.next_stage() 1131 bb.event.check_for_interrupts(self.cooker.data) 1132 1133 # Sanity Check - Check for multiple tasks building the same provider 1134 for mc in self.dataCaches: 1135 prov_list = {} 1136 seen_fn = [] 1137 for tid in self.runtaskentries: 1138 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1139 if taskfn in seen_fn: 1140 continue 1141 if mc != tidmc: 1142 continue 1143 seen_fn.append(taskfn) 1144 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1145 if prov not in prov_list: 1146 prov_list[prov] = [taskfn] 1147 elif taskfn not in prov_list[prov]: 1148 prov_list[prov].append(taskfn) 1149 for prov in prov_list: 1150 if len(prov_list[prov]) < 2: 1151 continue 1152 if prov in self.multi_provider_allowed: 1153 continue 1154 seen_pn = [] 1155 # If two versions of the same PN are being built its fatal, we don't support it. 1156 for fn in prov_list[prov]: 1157 pn = self.dataCaches[mc].pkg_fn[fn] 1158 if pn not in seen_pn: 1159 seen_pn.append(pn) 1160 else: 1161 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1162 msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] 1163 # 1164 # Construct a list of things which uniquely depend on each provider 1165 # since this may help the user figure out which dependency is triggering this warning 1166 # 1167 msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") 1168 deplist = {} 1169 commondeps = None 1170 for provfn in prov_list[prov]: 1171 deps = set() 1172 for tid in self.runtaskentries: 1173 fn = fn_from_tid(tid) 1174 if fn != provfn: 1175 continue 1176 for dep in self.runtaskentries[tid].revdeps: 1177 fn = fn_from_tid(dep) 1178 if fn == provfn: 1179 continue 1180 deps.add(dep) 1181 if not commondeps: 1182 commondeps = set(deps) 1183 else: 1184 commondeps &= deps 1185 deplist[provfn] = deps 1186 for provfn in deplist: 1187 msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) 1188 # 1189 # Construct a list of provides and runtime providers for each recipe 1190 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1191 # 1192 msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") 1193 provide_results = {} 1194 rprovide_results = {} 1195 commonprovs = None 1196 commonrprovs = None 1197 for provfn in prov_list[prov]: 1198 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1199 rprovides = set() 1200 for rprovide in self.dataCaches[mc].rproviders: 1201 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1202 rprovides.add(rprovide) 1203 for package in self.dataCaches[mc].packages: 1204 if provfn in self.dataCaches[mc].packages[package]: 1205 rprovides.add(package) 1206 for package in self.dataCaches[mc].packages_dynamic: 1207 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1208 rprovides.add(package) 1209 if not commonprovs: 1210 commonprovs = set(provides) 1211 else: 1212 commonprovs &= provides 1213 provide_results[provfn] = provides 1214 if not commonrprovs: 1215 commonrprovs = set(rprovides) 1216 else: 1217 commonrprovs &= rprovides 1218 rprovide_results[provfn] = rprovides 1219 #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) 1220 #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) 1221 for provfn in prov_list[prov]: 1222 msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) 1223 msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) 1224 1225 if self.warn_multi_bb: 1226 logger.verbnote("".join(msgs)) 1227 else: 1228 logger.error("".join(msgs)) 1229 1230 self.init_progress_reporter.next_stage() 1231 self.init_progress_reporter.next_stage() 1232 bb.event.check_for_interrupts(self.cooker.data) 1233 1234 # Iterate over the task list looking for tasks with a 'setscene' function 1235 self.runq_setscene_tids = set() 1236 if not self.cooker.configuration.nosetscene: 1237 for tid in self.runtaskentries: 1238 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1239 setscenetid = tid + "_setscene" 1240 if setscenetid not in taskData[mc].taskentries: 1241 continue 1242 self.runq_setscene_tids.add(tid) 1243 1244 self.init_progress_reporter.next_stage() 1245 bb.event.check_for_interrupts(self.cooker.data) 1246 1247 # Invalidate task if force mode active 1248 if self.cooker.configuration.force: 1249 for tid in self.target_tids: 1250 invalidate_task(tid, False) 1251 1252 # Invalidate task if invalidate mode active 1253 if self.cooker.configuration.invalidate_stamp: 1254 for tid in self.target_tids: 1255 fn = fn_from_tid(tid) 1256 for st in self.cooker.configuration.invalidate_stamp.split(','): 1257 if not st.startswith("do_"): 1258 st = "do_%s" % st 1259 invalidate_task(fn + ":" + st, True) 1260 1261 self.init_progress_reporter.next_stage() 1262 bb.event.check_for_interrupts(self.cooker.data) 1263 1264 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1265 for mc in taskData: 1266 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1267 virtpnmap = {} 1268 for v in virtmap: 1269 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1270 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1271 if hasattr(bb.parse.siggen, "tasks_resolved"): 1272 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1273 1274 self.init_progress_reporter.next_stage() 1275 bb.event.check_for_interrupts(self.cooker.data) 1276 1277 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1278 1279 starttime = time.time() 1280 lasttime = starttime 1281 1282 # Iterate over the task list and call into the siggen code 1283 dealtwith = set() 1284 todeal = set(self.runtaskentries) 1285 while todeal: 1286 ready = set() 1287 for tid in todeal.copy(): 1288 if not (self.runtaskentries[tid].depends - dealtwith): 1289 self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1290 # get_taskhash for a given tid *must* be called before get_unihash* below 1291 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1292 ready.add(tid) 1293 unihashes = bb.parse.siggen.get_unihashes(ready) 1294 for tid in ready: 1295 dealtwith.add(tid) 1296 todeal.remove(tid) 1297 self.runtaskentries[tid].unihash = unihashes[tid] 1298 1299 bb.event.check_for_interrupts(self.cooker.data) 1300 1301 if time.time() > (lasttime + 30): 1302 lasttime = time.time() 1303 hashequiv_logger.verbose("Initial setup loop progress: %s of %s in %s" % (len(todeal), len(self.runtaskentries), lasttime - starttime)) 1304 1305 endtime = time.time() 1306 if (endtime-starttime > 60): 1307 hashequiv_logger.verbose("Initial setup loop took: %s" % (endtime-starttime)) 1308 1309 bb.parse.siggen.writeout_file_checksum_cache() 1310 1311 #self.dump_data() 1312 return len(self.runtaskentries) 1313 1314 def dump_data(self): 1315 """ 1316 Dump some debug information on the internal data structures 1317 """ 1318 logger.debug3("run_tasks:") 1319 for tid in self.runtaskentries: 1320 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1321 self.runtaskentries[tid].weight, 1322 self.runtaskentries[tid].depends, 1323 self.runtaskentries[tid].revdeps) 1324 1325class RunQueueWorker(): 1326 def __init__(self, process, pipe): 1327 self.process = process 1328 self.pipe = pipe 1329 1330class RunQueue: 1331 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1332 1333 self.cooker = cooker 1334 self.cfgData = cfgData 1335 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1336 1337 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1338 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1339 1340 self.state = runQueuePrepare 1341 1342 # For disk space monitor 1343 # Invoked at regular time intervals via the bitbake heartbeat event 1344 # while the build is running. We generate a unique name for the handler 1345 # here, just in case that there ever is more than one RunQueue instance, 1346 # start the handler when reaching runQueueSceneInit, and stop it when 1347 # done with the build. 1348 self.dm = monitordisk.diskMonitor(cfgData) 1349 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1350 self.dm_event_handler_registered = False 1351 self.rqexe = None 1352 self.worker = {} 1353 self.fakeworker = {} 1354 1355 @staticmethod 1356 def send_pickled_data(worker, data, name): 1357 msg = bytearray() 1358 msg.extend(b"<" + name.encode() + b">") 1359 pickled_data = pickle.dumps(data) 1360 msg.extend(len(pickled_data).to_bytes(4, 'big')) 1361 msg.extend(pickled_data) 1362 msg.extend(b"</" + name.encode() + b">") 1363 worker.stdin.write(msg) 1364 1365 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1366 logger.debug("Starting bitbake-worker") 1367 magic = "decafbad" 1368 if self.cooker.configuration.profile: 1369 magic = "decafbadbad" 1370 fakerootlogs = None 1371 1372 workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker") 1373 if fakeroot: 1374 magic = magic + "beef" 1375 mcdata = self.cooker.databuilder.mcdata[mc] 1376 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1377 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1378 env = os.environ.copy() 1379 for key, value in (var.split('=',1) for var in fakerootenv): 1380 env[key] = value 1381 worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1382 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1383 else: 1384 worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1385 bb.utils.nonblockingfd(worker.stdout) 1386 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1387 1388 workerdata = { 1389 "sigdata" : bb.parse.siggen.get_taskdata(), 1390 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1391 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1392 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1393 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1394 "prhost" : self.cooker.prhost, 1395 "buildname" : self.cfgData.getVar("BUILDNAME"), 1396 "date" : self.cfgData.getVar("DATE"), 1397 "time" : self.cfgData.getVar("TIME"), 1398 "hashservaddr" : self.cooker.hashservaddr, 1399 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1400 } 1401 1402 RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") 1403 RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") 1404 RunQueue.send_pickled_data(worker, workerdata, "workerdata") 1405 worker.stdin.flush() 1406 1407 return RunQueueWorker(worker, workerpipe) 1408 1409 def _teardown_worker(self, worker): 1410 if not worker: 1411 return 1412 logger.debug("Teardown for bitbake-worker") 1413 try: 1414 RunQueue.send_pickled_data(worker.process, b"", "quit") 1415 worker.process.stdin.flush() 1416 worker.process.stdin.close() 1417 except IOError: 1418 pass 1419 while worker.process.returncode is None: 1420 worker.pipe.read() 1421 worker.process.poll() 1422 while worker.pipe.read(): 1423 continue 1424 worker.pipe.close() 1425 1426 def start_worker(self, rqexec): 1427 if self.worker: 1428 self.teardown_workers() 1429 self.teardown = False 1430 for mc in self.rqdata.dataCaches: 1431 self.worker[mc] = self._start_worker(mc, False, rqexec) 1432 1433 def start_fakeworker(self, rqexec, mc): 1434 if not mc in self.fakeworker: 1435 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1436 1437 def teardown_workers(self): 1438 self.teardown = True 1439 for mc in self.worker: 1440 self._teardown_worker(self.worker[mc]) 1441 self.worker = {} 1442 for mc in self.fakeworker: 1443 self._teardown_worker(self.fakeworker[mc]) 1444 self.fakeworker = {} 1445 1446 def read_workers(self): 1447 for mc in self.worker: 1448 self.worker[mc].pipe.read() 1449 for mc in self.fakeworker: 1450 self.fakeworker[mc].pipe.read() 1451 1452 def active_fds(self): 1453 fds = [] 1454 for mc in self.worker: 1455 fds.append(self.worker[mc].pipe.input) 1456 for mc in self.fakeworker: 1457 fds.append(self.fakeworker[mc].pipe.input) 1458 return fds 1459 1460 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1461 def get_timestamp(f): 1462 try: 1463 if not os.access(f, os.F_OK): 1464 return None 1465 return os.stat(f)[stat.ST_MTIME] 1466 except: 1467 return None 1468 1469 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1470 if taskname is None: 1471 taskname = tn 1472 1473 stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn) 1474 1475 # If the stamp is missing, it's not current 1476 if not os.access(stampfile, os.F_OK): 1477 logger.debug2("Stampfile %s not available", stampfile) 1478 return False 1479 # If it's a 'nostamp' task, it's not current 1480 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1481 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1482 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1483 return False 1484 1485 if taskname.endswith("_setscene"): 1486 return True 1487 1488 if cache is None: 1489 cache = {} 1490 1491 iscurrent = True 1492 t1 = get_timestamp(stampfile) 1493 for dep in self.rqdata.runtaskentries[tid].depends: 1494 if iscurrent: 1495 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1496 stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2) 1497 stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2) 1498 t2 = get_timestamp(stampfile2) 1499 t3 = get_timestamp(stampfile3) 1500 if t3 and not t2: 1501 continue 1502 if t3 and t3 > t2: 1503 continue 1504 if fn == fn2: 1505 if not t2: 1506 logger.debug2('Stampfile %s does not exist', stampfile2) 1507 iscurrent = False 1508 break 1509 if t1 < t2: 1510 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1511 iscurrent = False 1512 break 1513 if recurse and iscurrent: 1514 if dep in cache: 1515 iscurrent = cache[dep] 1516 if not iscurrent: 1517 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1518 else: 1519 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1520 cache[dep] = iscurrent 1521 if recurse: 1522 cache[tid] = iscurrent 1523 return iscurrent 1524 1525 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1526 valid = set() 1527 if self.hashvalidate: 1528 sq_data = {} 1529 sq_data['hash'] = {} 1530 sq_data['hashfn'] = {} 1531 sq_data['unihash'] = {} 1532 for tid in tocheck: 1533 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1534 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1535 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1536 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1537 1538 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1539 1540 return valid 1541 1542 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1543 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1544 1545 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1546 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1547 1548 return bb.utils.better_eval(call, locs) 1549 1550 def _execute_runqueue(self): 1551 """ 1552 Run the tasks in a queue prepared by rqdata.prepare() 1553 Upon failure, optionally try to recover the build using any alternate providers 1554 (if the halt on failure configuration option isn't set) 1555 """ 1556 1557 retval = True 1558 bb.event.check_for_interrupts(self.cooker.data) 1559 1560 if self.state is runQueuePrepare: 1561 # NOTE: if you add, remove or significantly refactor the stages of this 1562 # process then you should recalculate the weightings here. This is quite 1563 # easy to do - just change the next line temporarily to pass debug=True as 1564 # the last parameter and you'll get a printout of the weightings as well 1565 # as a map to the lines where next_stage() was called. Of course this isn't 1566 # critical, but it helps to keep the progress reporting accurate. 1567 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1568 "Initialising tasks", 1569 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1570 if self.rqdata.prepare() == 0: 1571 self.state = runQueueComplete 1572 else: 1573 self.state = runQueueSceneInit 1574 bb.parse.siggen.save_unitaskhashes() 1575 1576 if self.state is runQueueSceneInit: 1577 self.rqdata.init_progress_reporter.next_stage() 1578 1579 # we are ready to run, emit dependency info to any UI or class which 1580 # needs it 1581 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1582 self.rqdata.init_progress_reporter.next_stage() 1583 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1584 1585 if not self.dm_event_handler_registered: 1586 res = bb.event.register(self.dm_event_handler_name, 1587 lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1588 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1589 self.dm_event_handler_registered = True 1590 1591 self.rqdata.init_progress_reporter.next_stage() 1592 self.rqexe = RunQueueExecute(self) 1593 1594 dumpsigs = self.cooker.configuration.dump_signatures 1595 if dumpsigs: 1596 self.rqdata.init_progress_reporter.finish() 1597 if 'printdiff' in dumpsigs: 1598 self.invalidtasks_dump = self.print_diffscenetasks() 1599 self.state = runQueueDumpSigs 1600 1601 if self.state is runQueueDumpSigs: 1602 dumpsigs = self.cooker.configuration.dump_signatures 1603 retval = self.dump_signatures(dumpsigs) 1604 if retval is False: 1605 if 'printdiff' in dumpsigs: 1606 self.write_diffscenetasks(self.invalidtasks_dump) 1607 self.state = runQueueComplete 1608 1609 if self.state is runQueueSceneInit: 1610 self.start_worker(self.rqexe) 1611 self.rqdata.init_progress_reporter.finish() 1612 1613 # If we don't have any setscene functions, skip execution 1614 if not self.rqdata.runq_setscene_tids: 1615 logger.info('No setscene tasks') 1616 for tid in self.rqdata.runtaskentries: 1617 if not self.rqdata.runtaskentries[tid].depends: 1618 self.rqexe.setbuildable(tid) 1619 self.rqexe.tasks_notcovered.add(tid) 1620 self.rqexe.sqdone = True 1621 logger.info('Executing Tasks') 1622 self.state = runQueueRunning 1623 1624 if self.state is runQueueRunning: 1625 retval = self.rqexe.execute() 1626 1627 if self.state is runQueueCleanUp: 1628 retval = self.rqexe.finish() 1629 1630 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1631 1632 if build_done and self.dm_event_handler_registered: 1633 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1634 self.dm_event_handler_registered = False 1635 1636 if build_done and self.rqexe: 1637 bb.parse.siggen.save_unitaskhashes() 1638 self.teardown_workers() 1639 if self.rqexe: 1640 if self.rqexe.stats.failed: 1641 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1642 else: 1643 # Let's avoid the word "failed" if nothing actually did 1644 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1645 1646 if self.state is runQueueFailed: 1647 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1648 1649 if self.state is runQueueComplete: 1650 # All done 1651 return False 1652 1653 # Loop 1654 return retval 1655 1656 def execute_runqueue(self): 1657 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1658 try: 1659 return self._execute_runqueue() 1660 except bb.runqueue.TaskFailure: 1661 raise 1662 except SystemExit: 1663 raise 1664 except bb.BBHandledException: 1665 try: 1666 self.teardown_workers() 1667 except: 1668 pass 1669 self.state = runQueueComplete 1670 raise 1671 except Exception as err: 1672 logger.exception("An uncaught exception occurred in runqueue") 1673 try: 1674 self.teardown_workers() 1675 except: 1676 pass 1677 self.state = runQueueComplete 1678 raise 1679 1680 def finish_runqueue(self, now = False): 1681 if not self.rqexe: 1682 self.state = runQueueComplete 1683 return 1684 1685 if now: 1686 self.rqexe.finish_now() 1687 else: 1688 self.rqexe.finish() 1689 1690 def _rq_dump_sigtid(self, tids): 1691 for tid in tids: 1692 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1693 dataCaches = self.rqdata.dataCaches 1694 bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True) 1695 1696 def dump_signatures(self, options): 1697 if not hasattr(self, "dumpsigs_launched"): 1698 if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset: 1699 bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled") 1700 1701 bb.note("Writing task signature files") 1702 1703 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1704 def chunkify(l, n): 1705 return [l[i::n] for i in range(n)] 1706 dumpsigs_tids = chunkify(list(self.rqdata.runtaskentries), max_process) 1707 1708 # We cannot use the real multiprocessing.Pool easily due to some local data 1709 # that can't be pickled. This is a cheap multi-process solution. 1710 self.dumpsigs_launched = [] 1711 1712 for tids in dumpsigs_tids: 1713 p = Process(target=self._rq_dump_sigtid, args=(tids, )) 1714 p.start() 1715 self.dumpsigs_launched.append(p) 1716 1717 return 1.0 1718 1719 for q in self.dumpsigs_launched: 1720 # The finished processes are joined when calling is_alive() 1721 if not q.is_alive(): 1722 self.dumpsigs_launched.remove(q) 1723 1724 if self.dumpsigs_launched: 1725 return 1.0 1726 1727 for p in self.dumpsigs_launched: 1728 p.join() 1729 1730 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1731 1732 return False 1733 1734 def print_diffscenetasks(self): 1735 def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid): 1736 invalidtasks = [] 1737 for t in taskdepends[task].depends: 1738 if t not in valid and t not in visited_invalid: 1739 invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid)) 1740 visited_invalid.add(t) 1741 1742 direct_invalid = [t for t in taskdepends[task].depends if t not in valid] 1743 if not direct_invalid and task not in noexec: 1744 invalidtasks = [task] 1745 return invalidtasks 1746 1747 noexec = [] 1748 tocheck = set() 1749 1750 for tid in self.rqdata.runtaskentries: 1751 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1752 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1753 1754 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1755 noexec.append(tid) 1756 continue 1757 1758 tocheck.add(tid) 1759 1760 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1761 1762 # Tasks which are both setscene and noexec never care about dependencies 1763 # We therefore find tasks which are setscene and noexec and mark their 1764 # unique dependencies as valid. 1765 for tid in noexec: 1766 if tid not in self.rqdata.runq_setscene_tids: 1767 continue 1768 for dep in self.rqdata.runtaskentries[tid].depends: 1769 hasnoexecparents = True 1770 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1771 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1772 continue 1773 hasnoexecparents = False 1774 break 1775 if hasnoexecparents: 1776 valid_new.add(dep) 1777 1778 invalidtasks = set() 1779 1780 toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets]) 1781 for tid in toptasks: 1782 toprocess = set([tid]) 1783 while toprocess: 1784 next = set() 1785 visited_invalid = set() 1786 for t in toprocess: 1787 if t not in valid_new and t not in noexec: 1788 invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid)) 1789 continue 1790 if t in self.rqdata.runq_setscene_tids: 1791 for dep in self.rqexe.sqdata.sq_deps[t]: 1792 next.add(dep) 1793 continue 1794 1795 for dep in self.rqdata.runtaskentries[t].depends: 1796 next.add(dep) 1797 1798 toprocess = next 1799 1800 tasklist = [] 1801 for tid in invalidtasks: 1802 tasklist.append(tid) 1803 1804 if tasklist: 1805 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1806 1807 return invalidtasks 1808 1809 def write_diffscenetasks(self, invalidtasks): 1810 bb.siggen.check_siggen_version(bb.siggen) 1811 1812 # Define recursion callback 1813 def recursecb(key, hash1, hash2): 1814 hashes = [hash1, hash2] 1815 bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes)) 1816 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1817 bb.debug(1, "Found hashfiles:\n{}".format(hashfiles)) 1818 1819 recout = [] 1820 if len(hashfiles) == 2: 1821 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb) 1822 recout.extend(list(' ' + l for l in out2)) 1823 else: 1824 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1825 1826 return recout 1827 1828 1829 for tid in invalidtasks: 1830 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1831 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1832 h = self.rqdata.runtaskentries[tid].unihash 1833 bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname)) 1834 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) 1835 bb.debug(1, "Found hashfiles:\n{}".format(matches)) 1836 match = None 1837 for m in matches.values(): 1838 if h in m['path']: 1839 match = m['path'] 1840 if match is None: 1841 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid)) 1842 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1843 matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']} 1844 if matches_local: 1845 matches = matches_local 1846 if matches: 1847 latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path'] 1848 prevh = __find_sha256__.search(latestmatch).group(0) 1849 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1850 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1851 1852 1853class RunQueueExecute: 1854 1855 def __init__(self, rq): 1856 self.rq = rq 1857 self.cooker = rq.cooker 1858 self.cfgData = rq.cfgData 1859 self.rqdata = rq.rqdata 1860 1861 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1862 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1863 self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") 1864 self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") 1865 self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") 1866 self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX") 1867 1868 self.sq_buildable = set() 1869 self.sq_running = set() 1870 self.sq_live = set() 1871 1872 self.updated_taskhash_queue = [] 1873 self.pending_migrations = set() 1874 1875 self.runq_buildable = set() 1876 self.runq_running = set() 1877 self.runq_complete = set() 1878 self.runq_tasksrun = set() 1879 1880 self.build_stamps = {} 1881 self.build_stamps2 = [] 1882 self.failed_tids = [] 1883 self.sq_deferred = {} 1884 self.sq_needed_harddeps = set() 1885 self.sq_harddep_deferred = set() 1886 1887 self.stampcache = {} 1888 1889 self.holdoff_tasks = set() 1890 self.holdoff_need_update = True 1891 self.sqdone = False 1892 1893 self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) 1894 1895 if self.number_tasks <= 0: 1896 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1897 1898 lower_limit = 1.0 1899 upper_limit = 1000000.0 1900 if self.max_cpu_pressure: 1901 self.max_cpu_pressure = float(self.max_cpu_pressure) 1902 if self.max_cpu_pressure < lower_limit: 1903 bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) 1904 if self.max_cpu_pressure > upper_limit: 1905 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) 1906 1907 if self.max_io_pressure: 1908 self.max_io_pressure = float(self.max_io_pressure) 1909 if self.max_io_pressure < lower_limit: 1910 bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) 1911 if self.max_io_pressure > upper_limit: 1912 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1913 1914 if self.max_memory_pressure: 1915 self.max_memory_pressure = float(self.max_memory_pressure) 1916 if self.max_memory_pressure < lower_limit: 1917 bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) 1918 if self.max_memory_pressure > upper_limit: 1919 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1920 1921 if self.max_loadfactor: 1922 self.max_loadfactor = float(self.max_loadfactor) 1923 if self.max_loadfactor <= 0: 1924 bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor)) 1925 1926 # List of setscene tasks which we've covered 1927 self.scenequeue_covered = set() 1928 # List of tasks which are covered (including setscene ones) 1929 self.tasks_covered = set() 1930 self.tasks_scenequeue_done = set() 1931 self.scenequeue_notcovered = set() 1932 self.tasks_notcovered = set() 1933 self.scenequeue_notneeded = set() 1934 1935 schedulers = self.get_schedulers() 1936 for scheduler in schedulers: 1937 if self.scheduler == scheduler.name: 1938 self.sched = scheduler(self, self.rqdata) 1939 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1940 break 1941 else: 1942 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1943 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1944 1945 #if self.rqdata.runq_setscene_tids: 1946 self.sqdata = SQData() 1947 build_scenequeue_data(self.sqdata, self.rqdata, self) 1948 1949 update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True) 1950 1951 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 1952 # in any stamp files. Pass the list out to metadata as an event. 1953 found = {} 1954 for tid in self.rqdata.runq_setscene_tids: 1955 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1956 stamps = bb.build.find_stale_stamps(taskname, taskfn) 1957 if stamps: 1958 if mc not in found: 1959 found[mc] = {} 1960 found[mc][tid] = stamps 1961 for mc in found: 1962 event = bb.event.StaleSetSceneTasks(found[mc]) 1963 bb.event.fire(event, self.cooker.databuilder.mcdata[mc]) 1964 1965 self.build_taskdepdata_cache() 1966 1967 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1968 1969 # self.build_stamps[pid] may not exist when use shared work directory. 1970 if task in self.build_stamps: 1971 self.build_stamps2.remove(self.build_stamps[task]) 1972 del self.build_stamps[task] 1973 1974 if task in self.sq_live: 1975 if status != 0: 1976 self.sq_task_fail(task, status) 1977 else: 1978 self.sq_task_complete(task) 1979 self.sq_live.remove(task) 1980 self.stats.updateActiveSetscene(len(self.sq_live)) 1981 else: 1982 if status != 0: 1983 self.task_fail(task, status, fakerootlog=fakerootlog) 1984 else: 1985 self.task_complete(task) 1986 return True 1987 1988 def finish_now(self): 1989 for mc in self.rq.worker: 1990 try: 1991 RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") 1992 self.rq.worker[mc].process.stdin.flush() 1993 except IOError: 1994 # worker must have died? 1995 pass 1996 for mc in self.rq.fakeworker: 1997 try: 1998 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") 1999 self.rq.fakeworker[mc].process.stdin.flush() 2000 except IOError: 2001 # worker must have died? 2002 pass 2003 2004 if self.failed_tids: 2005 self.rq.state = runQueueFailed 2006 return 2007 2008 self.rq.state = runQueueComplete 2009 return 2010 2011 def finish(self): 2012 self.rq.state = runQueueCleanUp 2013 2014 active = self.stats.active + len(self.sq_live) 2015 if active > 0: 2016 bb.event.fire(runQueueExitWait(active), self.cfgData) 2017 self.rq.read_workers() 2018 return self.rq.active_fds() 2019 2020 if self.failed_tids: 2021 self.rq.state = runQueueFailed 2022 return True 2023 2024 self.rq.state = runQueueComplete 2025 return True 2026 2027 # Used by setscene only 2028 def check_dependencies(self, task, taskdeps): 2029 if not self.rq.depvalidate: 2030 return False 2031 2032 # Must not edit parent data 2033 taskdeps = set(taskdeps) 2034 2035 taskdata = {} 2036 taskdeps.add(task) 2037 for dep in taskdeps: 2038 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 2039 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2040 taskdata[dep] = [pn, taskname, fn] 2041 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 2042 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 2043 valid = bb.utils.better_eval(call, locs) 2044 return valid 2045 2046 def can_start_task(self): 2047 active = self.stats.active + len(self.sq_live) 2048 can_start = active < self.number_tasks 2049 return can_start 2050 2051 def get_schedulers(self): 2052 schedulers = set(obj for obj in globals().values() 2053 if type(obj) is type and 2054 issubclass(obj, RunQueueScheduler)) 2055 2056 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 2057 if user_schedulers: 2058 for sched in user_schedulers.split(): 2059 if not "." in sched: 2060 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 2061 continue 2062 2063 modname, name = sched.rsplit(".", 1) 2064 try: 2065 module = __import__(modname, fromlist=(name,)) 2066 except ImportError as exc: 2067 bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 2068 else: 2069 schedulers.add(getattr(module, name)) 2070 return schedulers 2071 2072 def setbuildable(self, task): 2073 self.runq_buildable.add(task) 2074 self.sched.newbuildable(task) 2075 2076 def task_completeoutright(self, task): 2077 """ 2078 Mark a task as completed 2079 Look at the reverse dependencies and mark any task with 2080 completed dependencies as buildable 2081 """ 2082 self.runq_complete.add(task) 2083 for revdep in self.rqdata.runtaskentries[task].revdeps: 2084 if revdep in self.runq_running: 2085 continue 2086 if revdep in self.runq_buildable: 2087 continue 2088 alldeps = True 2089 for dep in self.rqdata.runtaskentries[revdep].depends: 2090 if dep not in self.runq_complete: 2091 alldeps = False 2092 break 2093 if alldeps: 2094 self.setbuildable(revdep) 2095 logger.debug("Marking task %s as buildable", revdep) 2096 2097 found = None 2098 for t in sorted(self.sq_deferred.copy()): 2099 if self.sq_deferred[t] == task: 2100 # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. 2101 # We shouldn't allow all to run at once as it is prone to races. 2102 if not found: 2103 bb.debug(1, "Deferred task %s now buildable" % t) 2104 del self.sq_deferred[t] 2105 update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2106 found = t 2107 else: 2108 bb.debug(1, "Deferring %s after %s" % (t, found)) 2109 self.sq_deferred[t] = found 2110 2111 def task_complete(self, task): 2112 self.stats.taskCompleted() 2113 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2114 self.task_completeoutright(task) 2115 self.runq_tasksrun.add(task) 2116 2117 def task_fail(self, task, exitcode, fakerootlog=None): 2118 """ 2119 Called when a task has failed 2120 Updates the state engine with the failure 2121 """ 2122 self.stats.taskFailed() 2123 self.failed_tids.append(task) 2124 2125 fakeroot_log = [] 2126 if fakerootlog and os.path.exists(fakerootlog): 2127 with open(fakerootlog) as fakeroot_log_file: 2128 fakeroot_failed = False 2129 for line in reversed(fakeroot_log_file.readlines()): 2130 for fakeroot_error in ['mismatch', 'error', 'fatal']: 2131 if fakeroot_error in line.lower(): 2132 fakeroot_failed = True 2133 if 'doing new pid setup and server start' in line: 2134 break 2135 fakeroot_log.append(line) 2136 2137 if not fakeroot_failed: 2138 fakeroot_log = [] 2139 2140 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) 2141 2142 if self.rqdata.taskData[''].halt: 2143 self.rq.state = runQueueCleanUp 2144 2145 def task_skip(self, task, reason): 2146 self.runq_running.add(task) 2147 self.setbuildable(task) 2148 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 2149 self.task_completeoutright(task) 2150 self.stats.taskSkipped() 2151 self.stats.taskCompleted() 2152 2153 def summarise_scenequeue_errors(self): 2154 err = False 2155 if not self.sqdone: 2156 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2157 completeevent = sceneQueueComplete(self.stats, self.rq) 2158 bb.event.fire(completeevent, self.cfgData) 2159 if self.sq_deferred: 2160 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 2161 err = True 2162 if self.updated_taskhash_queue: 2163 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 2164 err = True 2165 if self.holdoff_tasks: 2166 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 2167 err = True 2168 2169 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 2170 # No task should end up in both covered and uncovered, that is a bug. 2171 logger.error("Setscene task %s in both covered and notcovered." % tid) 2172 2173 for tid in self.rqdata.runq_setscene_tids: 2174 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2175 err = True 2176 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 2177 if tid not in self.sq_buildable: 2178 err = True 2179 logger.error("Setscene Task %s was never marked as buildable" % tid) 2180 if tid not in self.sq_running: 2181 err = True 2182 logger.error("Setscene Task %s was never marked as running" % tid) 2183 2184 for x in self.rqdata.runtaskentries: 2185 if x not in self.tasks_covered and x not in self.tasks_notcovered: 2186 logger.error("Task %s was never moved from the setscene queue" % x) 2187 err = True 2188 if x not in self.tasks_scenequeue_done: 2189 logger.error("Task %s was never processed by the setscene code" % x) 2190 err = True 2191 if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: 2192 logger.error("Task %s was never marked as buildable by the setscene code" % x) 2193 err = True 2194 return err 2195 2196 2197 def execute(self): 2198 """ 2199 Run the tasks in a queue prepared by prepare_runqueue 2200 """ 2201 2202 self.rq.read_workers() 2203 if self.updated_taskhash_queue or self.pending_migrations: 2204 self.process_possible_migrations() 2205 2206 if not hasattr(self, "sorted_setscene_tids"): 2207 # Don't want to sort this set every execution 2208 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 2209 # Resume looping where we left off when we returned to feed the mainloop 2210 self.setscene_tids_generator = itertools.cycle(self.rqdata.runq_setscene_tids) 2211 2212 task = None 2213 if not self.sqdone and self.can_start_task(): 2214 loopcount = 0 2215 # Find the next setscene to run, exit the loop when we've processed all tids or found something to execute 2216 while loopcount < len(self.rqdata.runq_setscene_tids): 2217 loopcount += 1 2218 nexttask = next(self.setscene_tids_generator) 2219 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred: 2220 if nexttask in self.sq_deferred and self.sq_deferred[nexttask] not in self.runq_complete: 2221 # Skip deferred tasks quickly before the 'expensive' tests below - this is key to performant multiconfig builds 2222 continue 2223 if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \ 2224 nexttask not in self.sq_needed_harddeps and \ 2225 self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \ 2226 self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2227 if nexttask not in self.rqdata.target_tids: 2228 logger.debug2("Skipping setscene for task %s" % nexttask) 2229 self.sq_task_skip(nexttask) 2230 self.scenequeue_notneeded.add(nexttask) 2231 if nexttask in self.sq_deferred: 2232 del self.sq_deferred[nexttask] 2233 return True 2234 if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2235 logger.debug2("Deferring %s due to hard dependencies" % nexttask) 2236 updated = False 2237 for dep in self.sqdata.sq_harddeps_rev[nexttask]: 2238 if dep not in self.sq_needed_harddeps: 2239 logger.debug2("Enabling task %s as it is a hard dependency" % dep) 2240 self.sq_buildable.add(dep) 2241 self.sq_needed_harddeps.add(dep) 2242 updated = True 2243 self.sq_harddep_deferred.add(nexttask) 2244 if updated: 2245 return True 2246 continue 2247 # If covered tasks are running, need to wait for them to complete 2248 for t in self.sqdata.sq_covered_tasks[nexttask]: 2249 if t in self.runq_running and t not in self.runq_complete: 2250 continue 2251 if nexttask in self.sq_deferred: 2252 # Deferred tasks that were still deferred were skipped above so we now need to process 2253 logger.debug("Task %s no longer deferred" % nexttask) 2254 del self.sq_deferred[nexttask] 2255 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2256 if not valid: 2257 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2258 self.sq_task_failoutright(nexttask) 2259 return True 2260 if nexttask in self.sqdata.outrightfail: 2261 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2262 self.sq_task_failoutright(nexttask) 2263 return True 2264 if nexttask in self.sqdata.unskippable: 2265 logger.debug2("Setscene task %s is unskippable" % nexttask) 2266 task = nexttask 2267 break 2268 if task is not None: 2269 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2270 taskname = taskname + "_setscene" 2271 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2272 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2273 self.sq_task_failoutright(task) 2274 return True 2275 2276 if self.cooker.configuration.force: 2277 if task in self.rqdata.target_tids: 2278 self.sq_task_failoutright(task) 2279 return True 2280 2281 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2282 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2283 self.sq_task_skip(task) 2284 return True 2285 2286 if self.cooker.configuration.skipsetscene: 2287 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2288 self.sq_task_failoutright(task) 2289 return True 2290 2291 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 2292 bb.event.fire(startevent, self.cfgData) 2293 2294 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2295 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2296 runtask = { 2297 'fn' : taskfn, 2298 'task' : task, 2299 'taskname' : taskname, 2300 'taskhash' : self.rqdata.get_task_hash(task), 2301 'unihash' : self.rqdata.get_task_unihash(task), 2302 'quieterrors' : True, 2303 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2304 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2305 'taskdepdata' : self.sq_build_taskdepdata(task), 2306 'dry_run' : False, 2307 'taskdep': taskdep, 2308 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2309 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2310 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2311 } 2312 2313 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2314 if not mc in self.rq.fakeworker: 2315 self.rq.start_fakeworker(self, mc) 2316 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") 2317 self.rq.fakeworker[mc].process.stdin.flush() 2318 else: 2319 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") 2320 self.rq.worker[mc].process.stdin.flush() 2321 2322 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2323 self.build_stamps2.append(self.build_stamps[task]) 2324 self.sq_running.add(task) 2325 self.sq_live.add(task) 2326 self.stats.updateActiveSetscene(len(self.sq_live)) 2327 if self.can_start_task(): 2328 return True 2329 2330 self.update_holdofftasks() 2331 2332 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2333 hashequiv_logger.verbose("Setscene tasks completed") 2334 2335 err = self.summarise_scenequeue_errors() 2336 if err: 2337 self.rq.state = runQueueFailed 2338 return True 2339 2340 if self.cooker.configuration.setsceneonly: 2341 self.rq.state = runQueueComplete 2342 return True 2343 self.sqdone = True 2344 2345 if self.stats.total == 0: 2346 # nothing to do 2347 self.rq.state = runQueueComplete 2348 return True 2349 2350 if self.cooker.configuration.setsceneonly: 2351 task = None 2352 else: 2353 task = self.sched.next() 2354 if task is not None: 2355 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2356 2357 if self.rqdata.setscene_ignore_tasks is not None: 2358 if self.check_setscene_ignore_tasks(task): 2359 self.task_fail(task, "setscene ignore_tasks") 2360 return True 2361 2362 if task in self.tasks_covered: 2363 logger.debug2("Setscene covered task %s", task) 2364 self.task_skip(task, "covered") 2365 return True 2366 2367 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2368 logger.debug2("Stamp current task %s", task) 2369 2370 self.task_skip(task, "existing") 2371 self.runq_tasksrun.add(task) 2372 return True 2373 2374 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2375 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2376 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2377 noexec=True) 2378 bb.event.fire(startevent, self.cfgData) 2379 self.runq_running.add(task) 2380 self.stats.taskActive() 2381 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2382 bb.build.make_stamp_mcfn(taskname, taskfn) 2383 self.task_complete(task) 2384 return True 2385 else: 2386 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2387 bb.event.fire(startevent, self.cfgData) 2388 2389 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2390 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2391 runtask = { 2392 'fn' : taskfn, 2393 'task' : task, 2394 'taskname' : taskname, 2395 'taskhash' : self.rqdata.get_task_hash(task), 2396 'unihash' : self.rqdata.get_task_unihash(task), 2397 'quieterrors' : False, 2398 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2399 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2400 'taskdepdata' : self.build_taskdepdata(task), 2401 'dry_run' : self.rqdata.setscene_enforce, 2402 'taskdep': taskdep, 2403 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2404 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2405 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2406 } 2407 2408 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2409 if not mc in self.rq.fakeworker: 2410 try: 2411 self.rq.start_fakeworker(self, mc) 2412 except OSError as exc: 2413 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2414 self.rq.state = runQueueFailed 2415 self.stats.taskFailed() 2416 return True 2417 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") 2418 self.rq.fakeworker[mc].process.stdin.flush() 2419 else: 2420 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") 2421 self.rq.worker[mc].process.stdin.flush() 2422 2423 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2424 self.build_stamps2.append(self.build_stamps[task]) 2425 self.runq_running.add(task) 2426 self.stats.taskActive() 2427 if self.can_start_task(): 2428 return True 2429 2430 if self.stats.active > 0 or self.sq_live: 2431 self.rq.read_workers() 2432 return self.rq.active_fds() 2433 2434 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2435 if self.sq_deferred: 2436 deferred_tid = list(self.sq_deferred.keys())[0] 2437 blocking_tid = self.sq_deferred.pop(deferred_tid) 2438 logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid)) 2439 return True 2440 2441 if self.failed_tids: 2442 self.rq.state = runQueueFailed 2443 return True 2444 2445 # Sanity Checks 2446 err = self.summarise_scenequeue_errors() 2447 for task in self.rqdata.runtaskentries: 2448 if task not in self.runq_buildable: 2449 logger.error("Task %s never buildable!", task) 2450 err = True 2451 elif task not in self.runq_running: 2452 logger.error("Task %s never ran!", task) 2453 err = True 2454 elif task not in self.runq_complete: 2455 logger.error("Task %s never completed!", task) 2456 err = True 2457 2458 if err: 2459 self.rq.state = runQueueFailed 2460 else: 2461 self.rq.state = runQueueComplete 2462 2463 return True 2464 2465 def filtermcdeps(self, task, mc, deps): 2466 ret = set() 2467 for dep in deps: 2468 thismc = mc_from_tid(dep) 2469 if thismc != mc: 2470 continue 2471 ret.add(dep) 2472 return ret 2473 2474 # Build the individual cache entries in advance once to save time 2475 def build_taskdepdata_cache(self): 2476 taskdepdata_cache = {} 2477 for task in self.rqdata.runtaskentries: 2478 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2479 taskdepdata_cache[task] = bb.TaskData( 2480 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn], 2481 taskname = taskname, 2482 fn = fn, 2483 deps = self.filtermcdeps(task, mc, self.rqdata.runtaskentries[task].depends), 2484 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn], 2485 taskhash = self.rqdata.runtaskentries[task].hash, 2486 unihash = self.rqdata.runtaskentries[task].unihash, 2487 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn], 2488 taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps, 2489 ) 2490 2491 self.taskdepdata_cache = taskdepdata_cache 2492 2493 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2494 # as most code can't handle them 2495 def build_taskdepdata(self, task): 2496 taskdepdata = {} 2497 mc = mc_from_tid(task) 2498 next = self.rqdata.runtaskentries[task].depends.copy() 2499 next.add(task) 2500 next = self.filtermcdeps(task, mc, next) 2501 while next: 2502 additional = [] 2503 for revdep in next: 2504 self.taskdepdata_cache[revdep] = self.taskdepdata_cache[revdep]._replace( 2505 unihash=self.rqdata.runtaskentries[revdep].unihash 2506 ) 2507 taskdepdata[revdep] = self.taskdepdata_cache[revdep] 2508 for revdep2 in self.taskdepdata_cache[revdep].deps: 2509 if revdep2 not in taskdepdata: 2510 additional.append(revdep2) 2511 next = additional 2512 2513 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2514 return taskdepdata 2515 2516 def update_holdofftasks(self): 2517 2518 if not self.holdoff_need_update: 2519 return 2520 2521 notcovered = set(self.scenequeue_notcovered) 2522 notcovered |= self.sqdata.cantskip 2523 for tid in self.scenequeue_notcovered: 2524 notcovered |= self.sqdata.sq_covered_tasks[tid] 2525 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2526 notcovered.intersection_update(self.tasks_scenequeue_done) 2527 2528 covered = set(self.scenequeue_covered) 2529 for tid in self.scenequeue_covered: 2530 covered |= self.sqdata.sq_covered_tasks[tid] 2531 covered.difference_update(notcovered) 2532 covered.intersection_update(self.tasks_scenequeue_done) 2533 2534 for tid in notcovered | covered: 2535 if not self.rqdata.runtaskentries[tid].depends: 2536 self.setbuildable(tid) 2537 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2538 self.setbuildable(tid) 2539 2540 self.tasks_covered = covered 2541 self.tasks_notcovered = notcovered 2542 2543 self.holdoff_tasks = set() 2544 2545 for tid in self.rqdata.runq_setscene_tids: 2546 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2547 self.holdoff_tasks.add(tid) 2548 2549 for tid in self.holdoff_tasks.copy(): 2550 for dep in self.sqdata.sq_covered_tasks[tid]: 2551 if dep not in self.runq_complete: 2552 self.holdoff_tasks.add(dep) 2553 2554 self.holdoff_need_update = False 2555 2556 def process_possible_migrations(self): 2557 2558 changed = set() 2559 toprocess = set() 2560 for tid, unihash in self.updated_taskhash_queue.copy(): 2561 if tid in self.runq_running and tid not in self.runq_complete: 2562 continue 2563 2564 self.updated_taskhash_queue.remove((tid, unihash)) 2565 2566 if unihash != self.rqdata.runtaskentries[tid].unihash: 2567 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2568 torehash = [tid] 2569 for deftid in self.sq_deferred: 2570 if self.sq_deferred[deftid] == tid: 2571 torehash.append(deftid) 2572 for hashtid in torehash: 2573 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2574 self.rqdata.runtaskentries[hashtid].unihash = unihash 2575 bb.parse.siggen.set_unihash(hashtid, unihash) 2576 toprocess.add(hashtid) 2577 2578 # Work out all tasks which depend upon these 2579 total = set() 2580 next = set() 2581 for p in toprocess: 2582 next |= self.rqdata.runtaskentries[p].revdeps 2583 while next: 2584 current = next.copy() 2585 total = total | next 2586 next = set() 2587 for ntid in current: 2588 next |= self.rqdata.runtaskentries[ntid].revdeps 2589 next.difference_update(total) 2590 2591 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2592 next = set() 2593 for p in total: 2594 if not self.rqdata.runtaskentries[p].depends: 2595 next.add(p) 2596 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2597 next.add(p) 2598 2599 starttime = time.time() 2600 lasttime = starttime 2601 2602 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2603 while next: 2604 current = next.copy() 2605 next = set() 2606 ready = {} 2607 for tid in current: 2608 if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2609 continue 2610 # get_taskhash for a given tid *must* be called before get_unihash* below 2611 ready[tid] = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) 2612 2613 unihashes = bb.parse.siggen.get_unihashes(ready.keys()) 2614 2615 for tid in ready: 2616 orighash = self.rqdata.runtaskentries[tid].hash 2617 newhash = ready[tid] 2618 origuni = self.rqdata.runtaskentries[tid].unihash 2619 newuni = unihashes[tid] 2620 2621 # FIXME, need to check it can come from sstate at all for determinism? 2622 remapped = False 2623 if newuni == origuni: 2624 # Nothing to do, we match, skip code below 2625 remapped = True 2626 elif tid in self.scenequeue_covered or tid in self.sq_live: 2627 # Already ran this setscene task or it running. Report the new taskhash 2628 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2629 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2630 remapped = True 2631 2632 if not remapped: 2633 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2634 self.rqdata.runtaskentries[tid].hash = newhash 2635 self.rqdata.runtaskentries[tid].unihash = newuni 2636 changed.add(tid) 2637 2638 next |= self.rqdata.runtaskentries[tid].revdeps 2639 total.remove(tid) 2640 next.intersection_update(total) 2641 bb.event.check_for_interrupts(self.cooker.data) 2642 2643 if time.time() > (lasttime + 30): 2644 lasttime = time.time() 2645 hashequiv_logger.verbose("Rehash loop slow progress: %s in %s" % (len(total), lasttime - starttime)) 2646 2647 endtime = time.time() 2648 if (endtime-starttime > 60): 2649 hashequiv_logger.verbose("Rehash loop took more than 60s: %s" % (endtime-starttime)) 2650 2651 if changed: 2652 for mc in self.rq.worker: 2653 RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") 2654 for mc in self.rq.fakeworker: 2655 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") 2656 2657 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2658 2659 for tid in changed: 2660 if tid not in self.rqdata.runq_setscene_tids: 2661 continue 2662 if tid not in self.pending_migrations: 2663 self.pending_migrations.add(tid) 2664 2665 update_tasks = [] 2666 for tid in self.pending_migrations.copy(): 2667 if tid in self.runq_running or tid in self.sq_live: 2668 # Too late, task already running, not much we can do now 2669 self.pending_migrations.remove(tid) 2670 continue 2671 2672 valid = True 2673 # Check no tasks this covers are running 2674 for dep in self.sqdata.sq_covered_tasks[tid]: 2675 if dep in self.runq_running and dep not in self.runq_complete: 2676 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2677 valid = False 2678 break 2679 if not valid: 2680 continue 2681 2682 self.pending_migrations.remove(tid) 2683 changed = True 2684 2685 if tid in self.tasks_scenequeue_done: 2686 self.tasks_scenequeue_done.remove(tid) 2687 for dep in self.sqdata.sq_covered_tasks[tid]: 2688 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2689 bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) 2690 self.failed_tids.append(tid) 2691 self.rq.state = runQueueCleanUp 2692 return 2693 2694 if dep not in self.runq_complete: 2695 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2696 self.tasks_scenequeue_done.remove(dep) 2697 2698 if tid in self.sq_buildable: 2699 self.sq_buildable.remove(tid) 2700 if tid in self.sq_running: 2701 self.sq_running.remove(tid) 2702 if tid in self.sqdata.outrightfail: 2703 self.sqdata.outrightfail.remove(tid) 2704 if tid in self.scenequeue_notcovered: 2705 self.scenequeue_notcovered.remove(tid) 2706 if tid in self.scenequeue_covered: 2707 self.scenequeue_covered.remove(tid) 2708 if tid in self.scenequeue_notneeded: 2709 self.scenequeue_notneeded.remove(tid) 2710 2711 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2712 self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2713 2714 if tid in self.stampcache: 2715 del self.stampcache[tid] 2716 2717 if tid in self.build_stamps: 2718 del self.build_stamps[tid] 2719 2720 update_tasks.append(tid) 2721 2722 update_tasks2 = [] 2723 for tid in update_tasks: 2724 harddepfail = False 2725 for t in self.sqdata.sq_harddeps_rev[tid]: 2726 if t in self.scenequeue_notcovered: 2727 harddepfail = True 2728 break 2729 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2730 if tid not in self.sq_buildable: 2731 self.sq_buildable.add(tid) 2732 if not self.sqdata.sq_revdeps[tid]: 2733 self.sq_buildable.add(tid) 2734 2735 update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid)) 2736 2737 if update_tasks2: 2738 self.sqdone = False 2739 for mc in sorted(self.sqdata.multiconfigs): 2740 for tid in sorted([t[0] for t in update_tasks2]): 2741 if mc_from_tid(tid) != mc: 2742 continue 2743 h = pending_hash_index(tid, self.rqdata) 2744 if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: 2745 self.sq_deferred[tid] = self.sqdata.hashes[h] 2746 bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) 2747 update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2748 2749 for (tid, harddepfail, origvalid) in update_tasks2: 2750 if tid in self.sqdata.valid and not origvalid: 2751 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2752 if harddepfail: 2753 logger.debug2("%s has an unavailable hard dependency so skipping" % (tid)) 2754 self.sq_task_failoutright(tid) 2755 2756 if changed: 2757 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2758 self.sq_needed_harddeps = set() 2759 self.sq_harddep_deferred = set() 2760 self.holdoff_need_update = True 2761 2762 def scenequeue_updatecounters(self, task, fail=False): 2763 2764 if fail and task in self.sqdata.sq_harddeps: 2765 for dep in sorted(self.sqdata.sq_harddeps[task]): 2766 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2767 # dependency could be already processed, e.g. noexec setscene task 2768 continue 2769 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2770 if noexec or stamppresent: 2771 continue 2772 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2773 self.sq_task_failoutright(dep) 2774 continue 2775 2776 # For performance, only compute allcovered once if needed 2777 if self.sqdata.sq_deps[task]: 2778 allcovered = self.scenequeue_covered | self.scenequeue_notcovered 2779 for dep in sorted(self.sqdata.sq_deps[task]): 2780 if self.sqdata.sq_revdeps[dep].issubset(allcovered): 2781 if dep not in self.sq_buildable: 2782 self.sq_buildable.add(dep) 2783 2784 next = set([task]) 2785 while next: 2786 new = set() 2787 for t in sorted(next): 2788 self.tasks_scenequeue_done.add(t) 2789 # Look down the dependency chain for non-setscene things which this task depends on 2790 # and mark as 'done' 2791 for dep in self.rqdata.runtaskentries[t].depends: 2792 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2793 continue 2794 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2795 new.add(dep) 2796 next = new 2797 2798 # If this task was one which other setscene tasks have a hard dependency upon, we need 2799 # to walk through the hard dependencies and allow execution of those which have completed dependencies. 2800 if task in self.sqdata.sq_harddeps: 2801 for dep in self.sq_harddep_deferred.copy(): 2802 if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2803 self.sq_harddep_deferred.remove(dep) 2804 2805 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2806 self.holdoff_need_update = True 2807 2808 def sq_task_completeoutright(self, task): 2809 """ 2810 Mark a task as completed 2811 Look at the reverse dependencies and mark any task with 2812 completed dependencies as buildable 2813 """ 2814 2815 logger.debug('Found task %s which could be accelerated', task) 2816 self.scenequeue_covered.add(task) 2817 self.scenequeue_updatecounters(task) 2818 2819 def sq_check_taskfail(self, task): 2820 if self.rqdata.setscene_ignore_tasks is not None: 2821 realtask = task.split('_setscene')[0] 2822 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2823 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2824 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2825 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2826 self.rq.state = runQueueCleanUp 2827 2828 def sq_task_complete(self, task): 2829 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2830 self.sq_task_completeoutright(task) 2831 2832 def sq_task_fail(self, task, result): 2833 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) 2834 self.scenequeue_notcovered.add(task) 2835 self.scenequeue_updatecounters(task, True) 2836 self.sq_check_taskfail(task) 2837 2838 def sq_task_failoutright(self, task): 2839 self.sq_running.add(task) 2840 self.sq_buildable.add(task) 2841 self.scenequeue_notcovered.add(task) 2842 self.scenequeue_updatecounters(task, True) 2843 2844 def sq_task_skip(self, task): 2845 self.sq_running.add(task) 2846 self.sq_buildable.add(task) 2847 self.sq_task_completeoutright(task) 2848 2849 def sq_build_taskdepdata(self, task): 2850 def getsetscenedeps(tid): 2851 deps = set() 2852 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2853 realtid = tid + "_setscene" 2854 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2855 for (depname, idependtask) in idepends: 2856 if depname not in self.rqdata.taskData[mc].build_targets: 2857 continue 2858 2859 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2860 if depfn is None: 2861 continue 2862 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2863 deps.add(deptid) 2864 return deps 2865 2866 taskdepdata = {} 2867 next = getsetscenedeps(task) 2868 next.add(task) 2869 while next: 2870 additional = [] 2871 for revdep in next: 2872 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2873 deps = getsetscenedeps(revdep) 2874 2875 taskdepdata[revdep] = bb.TaskData( 2876 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn], 2877 taskname = taskname, 2878 fn = fn, 2879 deps = deps, 2880 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn], 2881 taskhash = self.rqdata.runtaskentries[revdep].hash, 2882 unihash = self.rqdata.runtaskentries[revdep].unihash, 2883 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn], 2884 taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps, 2885 ) 2886 for revdep2 in deps: 2887 if revdep2 not in taskdepdata: 2888 additional.append(revdep2) 2889 next = additional 2890 2891 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2892 return taskdepdata 2893 2894 def check_setscene_ignore_tasks(self, tid): 2895 # Check task that is going to run against the ignore tasks list 2896 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2897 # Ignore covered tasks 2898 if tid in self.tasks_covered: 2899 return False 2900 # Ignore stamped tasks 2901 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2902 return False 2903 # Ignore noexec tasks 2904 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2905 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2906 return False 2907 2908 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2909 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2910 if tid in self.rqdata.runq_setscene_tids: 2911 msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] 2912 else: 2913 msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] 2914 for t in self.scenequeue_notcovered: 2915 msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) 2916 msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2917 logger.error("".join(msg)) 2918 return True 2919 return False 2920 2921class SQData(object): 2922 def __init__(self): 2923 # SceneQueue dependencies 2924 self.sq_deps = {} 2925 # SceneQueue reverse dependencies 2926 self.sq_revdeps = {} 2927 # Injected inter-setscene task dependencies 2928 self.sq_harddeps = {} 2929 self.sq_harddeps_rev = {} 2930 # Cache of stamp files so duplicates can't run in parallel 2931 self.stamps = {} 2932 # Setscene tasks directly depended upon by the build 2933 self.unskippable = set() 2934 # List of setscene tasks which aren't present 2935 self.outrightfail = set() 2936 # A list of normal tasks a setscene task covers 2937 self.sq_covered_tasks = {} 2938 2939def build_scenequeue_data(sqdata, rqdata, sqrq): 2940 2941 sq_revdeps = {} 2942 sq_revdeps_squash = {} 2943 sq_collated_deps = {} 2944 2945 # We can't skip specified target tasks which aren't setscene tasks 2946 sqdata.cantskip = set(rqdata.target_tids) 2947 sqdata.cantskip.difference_update(rqdata.runq_setscene_tids) 2948 sqdata.cantskip.intersection_update(rqdata.runtaskentries) 2949 2950 # We need to construct a dependency graph for the setscene functions. Intermediate 2951 # dependencies between the setscene tasks only complicate the code. This code 2952 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2953 # only containing the setscene functions. 2954 2955 rqdata.init_progress_reporter.next_stage() 2956 2957 # First process the chains up to the first setscene task. 2958 endpoints = {} 2959 for tid in rqdata.runtaskentries: 2960 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2961 sq_revdeps_squash[tid] = set() 2962 if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: 2963 #bb.warn("Added endpoint %s" % (tid)) 2964 endpoints[tid] = set() 2965 2966 rqdata.init_progress_reporter.next_stage() 2967 2968 # Secondly process the chains between setscene tasks. 2969 for tid in rqdata.runq_setscene_tids: 2970 sq_collated_deps[tid] = set() 2971 #bb.warn("Added endpoint 2 %s" % (tid)) 2972 for dep in rqdata.runtaskentries[tid].depends: 2973 if tid in sq_revdeps[dep]: 2974 sq_revdeps[dep].remove(tid) 2975 if dep not in endpoints: 2976 endpoints[dep] = set() 2977 #bb.warn(" Added endpoint 3 %s" % (dep)) 2978 endpoints[dep].add(tid) 2979 2980 rqdata.init_progress_reporter.next_stage() 2981 2982 def process_endpoints(endpoints): 2983 newendpoints = {} 2984 for point, task in endpoints.items(): 2985 tasks = set() 2986 if task: 2987 tasks |= task 2988 if sq_revdeps_squash[point]: 2989 tasks |= sq_revdeps_squash[point] 2990 if point not in rqdata.runq_setscene_tids: 2991 for t in tasks: 2992 sq_collated_deps[t].add(point) 2993 sq_revdeps_squash[point] = set() 2994 if point in rqdata.runq_setscene_tids: 2995 sq_revdeps_squash[point] = tasks 2996 continue 2997 for dep in rqdata.runtaskentries[point].depends: 2998 if point in sq_revdeps[dep]: 2999 sq_revdeps[dep].remove(point) 3000 if tasks: 3001 sq_revdeps_squash[dep] |= tasks 3002 if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: 3003 newendpoints[dep] = task 3004 if newendpoints: 3005 process_endpoints(newendpoints) 3006 3007 process_endpoints(endpoints) 3008 3009 rqdata.init_progress_reporter.next_stage() 3010 3011 # Build a list of tasks which are "unskippable" 3012 # These are direct endpoints referenced by the build upto and including setscene tasks 3013 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 3014 new = True 3015 for tid in rqdata.runtaskentries: 3016 if not rqdata.runtaskentries[tid].revdeps: 3017 sqdata.unskippable.add(tid) 3018 sqdata.unskippable |= sqdata.cantskip 3019 while new: 3020 new = False 3021 orig = sqdata.unskippable.copy() 3022 for tid in sorted(orig, reverse=True): 3023 if tid in rqdata.runq_setscene_tids: 3024 continue 3025 if not rqdata.runtaskentries[tid].depends: 3026 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 3027 sqrq.setbuildable(tid) 3028 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 3029 if sqdata.unskippable != orig: 3030 new = True 3031 3032 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 3033 3034 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 3035 3036 # Sanity check all dependencies could be changed to setscene task references 3037 for tid in rqdata.runtaskentries: 3038 if tid in rqdata.runq_setscene_tids: 3039 pass 3040 elif sq_revdeps_squash[tid]: 3041 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") 3042 else: 3043 del sq_revdeps_squash[tid] 3044 3045 rqdata.init_progress_reporter.next_stage() 3046 3047 # Resolve setscene inter-task dependencies 3048 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 3049 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 3050 for tid in rqdata.runq_setscene_tids: 3051 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 3052 realtid = tid + "_setscene" 3053 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 3054 sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 3055 3056 sqdata.sq_harddeps_rev[tid] = set() 3057 for (depname, idependtask) in idepends: 3058 3059 if depname not in rqdata.taskData[mc].build_targets: 3060 continue 3061 3062 depfn = rqdata.taskData[mc].build_targets[depname][0] 3063 if depfn is None: 3064 continue 3065 deptid = depfn + ":" + idependtask.replace("_setscene", "") 3066 if deptid not in rqdata.runtaskentries: 3067 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 3068 3069 logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid)) 3070 3071 if not deptid in sqdata.sq_harddeps: 3072 sqdata.sq_harddeps[deptid] = set() 3073 sqdata.sq_harddeps[deptid].add(tid) 3074 sqdata.sq_harddeps_rev[tid].add(deptid) 3075 3076 rqdata.init_progress_reporter.next_stage() 3077 3078 rqdata.init_progress_reporter.next_stage() 3079 3080 #for tid in sq_revdeps_squash: 3081 # data = "" 3082 # for dep in sq_revdeps_squash[tid]: 3083 # data = data + "\n %s" % dep 3084 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 3085 3086 sqdata.sq_revdeps = sq_revdeps_squash 3087 sqdata.sq_covered_tasks = sq_collated_deps 3088 3089 # Build reverse version of revdeps to populate deps structure 3090 for tid in sqdata.sq_revdeps: 3091 sqdata.sq_deps[tid] = set() 3092 for tid in sqdata.sq_revdeps: 3093 for dep in sqdata.sq_revdeps[tid]: 3094 sqdata.sq_deps[dep].add(tid) 3095 3096 rqdata.init_progress_reporter.next_stage() 3097 3098 sqdata.multiconfigs = set() 3099 for tid in sqdata.sq_revdeps: 3100 sqdata.multiconfigs.add(mc_from_tid(tid)) 3101 if not sqdata.sq_revdeps[tid]: 3102 sqrq.sq_buildable.add(tid) 3103 3104 rqdata.init_progress_reporter.next_stage() 3105 3106 sqdata.noexec = set() 3107 sqdata.stamppresent = set() 3108 sqdata.valid = set() 3109 3110 sqdata.hashes = {} 3111 sqrq.sq_deferred = {} 3112 for mc in sorted(sqdata.multiconfigs): 3113 for tid in sorted(sqdata.sq_revdeps): 3114 if mc_from_tid(tid) != mc: 3115 continue 3116 h = pending_hash_index(tid, rqdata) 3117 if h not in sqdata.hashes: 3118 sqdata.hashes[h] = tid 3119 else: 3120 sqrq.sq_deferred[tid] = sqdata.hashes[h] 3121 bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) 3122 3123def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 3124 3125 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 3126 3127 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 3128 3129 if 'noexec' in taskdep and taskname in taskdep['noexec']: 3130 bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn) 3131 return True, False 3132 3133 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 3134 logger.debug2('Setscene stamp current for task %s', tid) 3135 return False, True 3136 3137 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 3138 logger.debug2('Normal stamp current for task %s', tid) 3139 return False, True 3140 3141 return False, False 3142 3143def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 3144 3145 tocheck = set() 3146 3147 for tid in sorted(tids): 3148 if tid in sqdata.stamppresent: 3149 sqdata.stamppresent.remove(tid) 3150 if tid in sqdata.valid: 3151 sqdata.valid.remove(tid) 3152 if tid in sqdata.outrightfail: 3153 sqdata.outrightfail.remove(tid) 3154 3155 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 3156 3157 if noexec: 3158 sqdata.noexec.add(tid) 3159 sqrq.sq_task_skip(tid) 3160 logger.debug2("%s is noexec so skipping setscene" % (tid)) 3161 continue 3162 3163 if stamppresent: 3164 sqdata.stamppresent.add(tid) 3165 sqrq.sq_task_skip(tid) 3166 logger.debug2("%s has a valid stamp, skipping" % (tid)) 3167 continue 3168 3169 tocheck.add(tid) 3170 3171 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 3172 3173 for tid in tids: 3174 if tid in sqdata.stamppresent: 3175 continue 3176 if tid in sqdata.valid: 3177 continue 3178 if tid in sqdata.noexec: 3179 continue 3180 if tid in sqrq.scenequeue_covered: 3181 continue 3182 if tid in sqrq.scenequeue_notcovered: 3183 continue 3184 if tid in sqrq.sq_deferred: 3185 continue 3186 sqdata.outrightfail.add(tid) 3187 logger.debug2("%s already handled (fallthrough), skipping" % (tid)) 3188 3189class TaskFailure(Exception): 3190 """ 3191 Exception raised when a task in a runqueue fails 3192 """ 3193 def __init__(self, x): 3194 self.args = x 3195 3196 3197class runQueueExitWait(bb.event.Event): 3198 """ 3199 Event when waiting for task processes to exit 3200 """ 3201 3202 def __init__(self, remain): 3203 self.remain = remain 3204 self.message = "Waiting for %s active tasks to finish" % remain 3205 bb.event.Event.__init__(self) 3206 3207class runQueueEvent(bb.event.Event): 3208 """ 3209 Base runQueue event class 3210 """ 3211 def __init__(self, task, stats, rq): 3212 self.taskid = task 3213 self.taskstring = task 3214 self.taskname = taskname_from_tid(task) 3215 self.taskfile = fn_from_tid(task) 3216 self.taskhash = rq.rqdata.get_task_hash(task) 3217 self.stats = stats.copy() 3218 bb.event.Event.__init__(self) 3219 3220class sceneQueueEvent(runQueueEvent): 3221 """ 3222 Base sceneQueue event class 3223 """ 3224 def __init__(self, task, stats, rq, noexec=False): 3225 runQueueEvent.__init__(self, task, stats, rq) 3226 self.taskstring = task + "_setscene" 3227 self.taskname = taskname_from_tid(task) + "_setscene" 3228 self.taskfile = fn_from_tid(task) 3229 self.taskhash = rq.rqdata.get_task_hash(task) 3230 3231class runQueueTaskStarted(runQueueEvent): 3232 """ 3233 Event notifying a task was started 3234 """ 3235 def __init__(self, task, stats, rq, noexec=False): 3236 runQueueEvent.__init__(self, task, stats, rq) 3237 self.noexec = noexec 3238 3239class sceneQueueTaskStarted(sceneQueueEvent): 3240 """ 3241 Event notifying a setscene task was started 3242 """ 3243 def __init__(self, task, stats, rq, noexec=False): 3244 sceneQueueEvent.__init__(self, task, stats, rq) 3245 self.noexec = noexec 3246 3247class runQueueTaskFailed(runQueueEvent): 3248 """ 3249 Event notifying a task failed 3250 """ 3251 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 3252 runQueueEvent.__init__(self, task, stats, rq) 3253 self.exitcode = exitcode 3254 self.fakeroot_log = fakeroot_log 3255 3256 def __str__(self): 3257 if self.fakeroot_log: 3258 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 3259 else: 3260 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 3261 3262class sceneQueueTaskFailed(sceneQueueEvent): 3263 """ 3264 Event notifying a setscene task failed 3265 """ 3266 def __init__(self, task, stats, exitcode, rq): 3267 sceneQueueEvent.__init__(self, task, stats, rq) 3268 self.exitcode = exitcode 3269 3270 def __str__(self): 3271 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 3272 3273class sceneQueueComplete(sceneQueueEvent): 3274 """ 3275 Event when all the sceneQueue tasks are complete 3276 """ 3277 def __init__(self, stats, rq): 3278 self.stats = stats.copy() 3279 bb.event.Event.__init__(self) 3280 3281class runQueueTaskCompleted(runQueueEvent): 3282 """ 3283 Event notifying a task completed 3284 """ 3285 3286class sceneQueueTaskCompleted(sceneQueueEvent): 3287 """ 3288 Event notifying a setscene task completed 3289 """ 3290 3291class runQueueTaskSkipped(runQueueEvent): 3292 """ 3293 Event notifying a task was skipped 3294 """ 3295 def __init__(self, task, stats, rq, reason): 3296 runQueueEvent.__init__(self, task, stats, rq) 3297 self.reason = reason 3298 3299class taskUniHashUpdate(bb.event.Event): 3300 """ 3301 Base runQueue event class 3302 """ 3303 def __init__(self, task, unihash): 3304 self.taskid = task 3305 self.unihash = unihash 3306 bb.event.Event.__init__(self) 3307 3308class runQueuePipe(): 3309 """ 3310 Abstraction for a pipe between a worker thread and the server 3311 """ 3312 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3313 self.input = pipein 3314 if pipeout: 3315 pipeout.close() 3316 bb.utils.nonblockingfd(self.input) 3317 self.queue = bytearray() 3318 self.d = d 3319 self.rq = rq 3320 self.rqexec = rqexec 3321 self.fakerootlogs = fakerootlogs 3322 3323 def read(self): 3324 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3325 for worker in workers.values(): 3326 worker.process.poll() 3327 if worker.process.returncode is not None and not self.rq.teardown: 3328 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3329 self.rq.finish_runqueue(True) 3330 3331 start = len(self.queue) 3332 try: 3333 self.queue.extend(self.input.read(512 * 1024) or b"") 3334 except (OSError, IOError) as e: 3335 if e.errno != errno.EAGAIN: 3336 raise 3337 end = len(self.queue) 3338 found = True 3339 while found and self.queue: 3340 found = False 3341 index = self.queue.find(b"</event>") 3342 while index != -1 and self.queue.startswith(b"<event>"): 3343 try: 3344 event = pickle.loads(self.queue[7:index]) 3345 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3346 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3347 # The pickled data could contain "</event>" so search for the next occurance 3348 # unpickling again, this should be the only way an unpickle error could occur 3349 index = self.queue.find(b"</event>", index + 1) 3350 continue 3351 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3352 bb.event.fire_from_worker(event, self.d) 3353 if isinstance(event, taskUniHashUpdate): 3354 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3355 found = True 3356 self.queue = self.queue[index+8:] 3357 index = self.queue.find(b"</event>") 3358 index = self.queue.find(b"</exitcode>") 3359 while index != -1 and self.queue.startswith(b"<exitcode>"): 3360 try: 3361 task, status = pickle.loads(self.queue[10:index]) 3362 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3363 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3364 (_, _, _, taskfn) = split_tid_mcfn(task) 3365 fakerootlog = None 3366 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3367 fakerootlog = self.fakerootlogs[taskfn] 3368 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3369 found = True 3370 self.queue = self.queue[index+11:] 3371 index = self.queue.find(b"</exitcode>") 3372 return (end > start) 3373 3374 def close(self): 3375 while self.read(): 3376 continue 3377 if self.queue: 3378 print("Warning, worker left partial message: %s" % self.queue) 3379 self.input.close() 3380 3381def get_setscene_enforce_ignore_tasks(d, targets): 3382 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3383 return None 3384 ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() 3385 outlist = [] 3386 for item in ignore_tasks[:]: 3387 if item.startswith('%:'): 3388 for (mc, target, task, fn) in targets: 3389 outlist.append(target + ':' + item.split(':')[1]) 3390 else: 3391 outlist.append(item) 3392 return outlist 3393 3394def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): 3395 import fnmatch 3396 if ignore_tasks is not None: 3397 item = '%s:%s' % (pn, taskname) 3398 for ignore_tasks in ignore_tasks: 3399 if fnmatch.fnmatch(item, ignore_tasks): 3400 return True 3401 return False 3402 return True 3403