1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import logging 18import re 19import bb 20from bb import msg, event 21from bb import monitordisk 22import subprocess 23import pickle 24from multiprocessing import Process 25import shlex 26import pprint 27import time 28 29bblogger = logging.getLogger("BitBake") 30logger = logging.getLogger("BitBake.RunQueue") 31hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 32 33__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 34 35def fn_from_tid(tid): 36 return tid.rsplit(":", 1)[0] 37 38def taskname_from_tid(tid): 39 return tid.rsplit(":", 1)[1] 40 41def mc_from_tid(tid): 42 if tid.startswith('mc:') and tid.count(':') >= 2: 43 return tid.split(':')[1] 44 return "" 45 46def split_tid(tid): 47 (mc, fn, taskname, _) = split_tid_mcfn(tid) 48 return (mc, fn, taskname) 49 50def split_mc(n): 51 if n.startswith("mc:") and n.count(':') >= 2: 52 _, mc, n = n.split(":", 2) 53 return (mc, n) 54 return ('', n) 55 56def split_tid_mcfn(tid): 57 if tid.startswith('mc:') and tid.count(':') >= 2: 58 elems = tid.split(':') 59 mc = elems[1] 60 fn = ":".join(elems[2:-1]) 61 taskname = elems[-1] 62 mcfn = "mc:" + mc + ":" + fn 63 else: 64 tid = tid.rsplit(":", 1) 65 mc = "" 66 fn = tid[0] 67 taskname = tid[1] 68 mcfn = fn 69 70 return (mc, fn, taskname, mcfn) 71 72def build_tid(mc, fn, taskname): 73 if mc: 74 return "mc:" + mc + ":" + fn + ":" + taskname 75 return fn + ":" + taskname 76 77# Index used to pair up potentially matching multiconfig tasks 78# We match on PN, taskname and hash being equal 79def pending_hash_index(tid, rqdata): 80 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 81 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 82 h = rqdata.runtaskentries[tid].unihash 83 return pn + ":" + "taskname" + h 84 85class RunQueueStats: 86 """ 87 Holds statistics on the tasks handled by the associated runQueue 88 """ 89 def __init__(self, total, setscene_total): 90 self.completed = 0 91 self.skipped = 0 92 self.failed = 0 93 self.active = 0 94 self.setscene_active = 0 95 self.setscene_covered = 0 96 self.setscene_notcovered = 0 97 self.setscene_total = setscene_total 98 self.total = total 99 100 def copy(self): 101 obj = self.__class__(self.total, self.setscene_total) 102 obj.__dict__.update(self.__dict__) 103 return obj 104 105 def taskFailed(self): 106 self.active = self.active - 1 107 self.failed = self.failed + 1 108 109 def taskCompleted(self): 110 self.active = self.active - 1 111 self.completed = self.completed + 1 112 113 def taskSkipped(self): 114 self.active = self.active + 1 115 self.skipped = self.skipped + 1 116 117 def taskActive(self): 118 self.active = self.active + 1 119 120 def updateCovered(self, covered, notcovered): 121 self.setscene_covered = covered 122 self.setscene_notcovered = notcovered 123 124 def updateActiveSetscene(self, active): 125 self.setscene_active = active 126 127# These values indicate the next step due to be run in the 128# runQueue state machine 129runQueuePrepare = 2 130runQueueSceneInit = 3 131runQueueRunning = 6 132runQueueFailed = 7 133runQueueCleanUp = 8 134runQueueComplete = 9 135 136class RunQueueScheduler(object): 137 """ 138 Control the order tasks are scheduled in. 139 """ 140 name = "basic" 141 142 def __init__(self, runqueue, rqdata): 143 """ 144 The default scheduler just returns the first buildable task (the 145 priority map is sorted by task number) 146 """ 147 self.rq = runqueue 148 self.rqdata = rqdata 149 self.numTasks = len(self.rqdata.runtaskentries) 150 151 self.prio_map = [self.rqdata.runtaskentries.keys()] 152 153 self.buildable = set() 154 self.skip_maxthread = {} 155 self.stamps = {} 156 for tid in self.rqdata.runtaskentries: 157 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 158 self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 159 if tid in self.rq.runq_buildable: 160 self.buildable.add(tid) 161 162 self.rev_prio_map = None 163 self.is_pressure_usable() 164 165 def is_pressure_usable(self): 166 """ 167 If monitoring pressure, return True if pressure files can be open and read. For example 168 openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) 169 is returned. 170 """ 171 if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: 172 try: 173 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 174 open("/proc/pressure/io") as io_pressure_fds, \ 175 open("/proc/pressure/memory") as memory_pressure_fds: 176 177 self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 178 self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 179 self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 180 self.prev_pressure_time = time.time() 181 self.check_pressure = True 182 except: 183 bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") 184 self.check_pressure = False 185 else: 186 self.check_pressure = False 187 188 def exceeds_max_pressure(self): 189 """ 190 Monitor the difference in total pressure at least once per second, if 191 BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. 192 """ 193 if self.check_pressure: 194 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 195 open("/proc/pressure/io") as io_pressure_fds, \ 196 open("/proc/pressure/memory") as memory_pressure_fds: 197 # extract "total" from /proc/pressure/{cpu|io} 198 curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 199 curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 200 curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 201 now = time.time() 202 tdiff = now - self.prev_pressure_time 203 psi_accumulation_interval = 1.0 204 cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff 205 io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff 206 memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff 207 exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure 208 exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure 209 exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure 210 211 if tdiff > psi_accumulation_interval: 212 self.prev_cpu_pressure = curr_cpu_pressure 213 self.prev_io_pressure = curr_io_pressure 214 self.prev_memory_pressure = curr_memory_pressure 215 self.prev_pressure_time = now 216 217 pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure) 218 pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure) 219 if hasattr(self, "pressure_state") and pressure_state != self.pressure_state: 220 bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))) 221 self.pressure_state = pressure_state 222 return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) 223 elif self.rq.max_loadfactor: 224 limit = False 225 loadfactor = float(os.getloadavg()[0]) / os.cpu_count() 226 # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor)) 227 if loadfactor > self.rq.max_loadfactor: 228 limit = True 229 if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit: 230 bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)) 231 self.loadfactor_limit = limit 232 return limit 233 return False 234 235 def next_buildable_task(self): 236 """ 237 Return the id of the first task we find that is buildable 238 """ 239 # Once tasks are running we don't need to worry about them again 240 self.buildable.difference_update(self.rq.runq_running) 241 buildable = set(self.buildable) 242 buildable.difference_update(self.rq.holdoff_tasks) 243 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 244 if not buildable: 245 return None 246 247 # Bitbake requires that at least one task be active. Only check for pressure if 248 # this is the case, otherwise the pressure limitation could result in no tasks 249 # being active and no new tasks started thereby, at times, breaking the scheduler. 250 if self.rq.stats.active and self.exceeds_max_pressure(): 251 return None 252 253 # Filter out tasks that have a max number of threads that have been exceeded 254 skip_buildable = {} 255 for running in self.rq.runq_running.difference(self.rq.runq_complete): 256 rtaskname = taskname_from_tid(running) 257 if rtaskname not in self.skip_maxthread: 258 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 259 if not self.skip_maxthread[rtaskname]: 260 continue 261 if rtaskname in skip_buildable: 262 skip_buildable[rtaskname] += 1 263 else: 264 skip_buildable[rtaskname] = 1 265 266 if len(buildable) == 1: 267 tid = buildable.pop() 268 taskname = taskname_from_tid(tid) 269 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 270 return None 271 stamp = self.stamps[tid] 272 if stamp not in self.rq.build_stamps.values(): 273 return tid 274 275 if not self.rev_prio_map: 276 self.rev_prio_map = {} 277 for tid in self.rqdata.runtaskentries: 278 self.rev_prio_map[tid] = self.prio_map.index(tid) 279 280 best = None 281 bestprio = None 282 for tid in buildable: 283 prio = self.rev_prio_map[tid] 284 if bestprio is None or bestprio > prio: 285 taskname = taskname_from_tid(tid) 286 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 287 continue 288 stamp = self.stamps[tid] 289 if stamp in self.rq.build_stamps.values(): 290 continue 291 bestprio = prio 292 best = tid 293 294 return best 295 296 def next(self): 297 """ 298 Return the id of the task we should build next 299 """ 300 if self.rq.can_start_task(): 301 return self.next_buildable_task() 302 303 def newbuildable(self, task): 304 self.buildable.add(task) 305 306 def removebuildable(self, task): 307 self.buildable.remove(task) 308 309 def describe_task(self, taskid): 310 result = 'ID %s' % taskid 311 if self.rev_prio_map: 312 result = result + (' pri %d' % self.rev_prio_map[taskid]) 313 return result 314 315 def dump_prio(self, comment): 316 bb.debug(3, '%s (most important first):\n%s' % 317 (comment, 318 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 319 index, taskid in enumerate(self.prio_map)]))) 320 321class RunQueueSchedulerSpeed(RunQueueScheduler): 322 """ 323 A scheduler optimised for speed. The priority map is sorted by task weight, 324 heavier weighted tasks (tasks needed by the most other tasks) are run first. 325 """ 326 name = "speed" 327 328 def __init__(self, runqueue, rqdata): 329 """ 330 The priority map is sorted by task weight. 331 """ 332 RunQueueScheduler.__init__(self, runqueue, rqdata) 333 334 weights = {} 335 for tid in self.rqdata.runtaskentries: 336 weight = self.rqdata.runtaskentries[tid].weight 337 if not weight in weights: 338 weights[weight] = [] 339 weights[weight].append(tid) 340 341 self.prio_map = [] 342 for weight in sorted(weights): 343 for w in weights[weight]: 344 self.prio_map.append(w) 345 346 self.prio_map.reverse() 347 348class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 349 """ 350 A scheduler optimised to complete .bb files as quickly as possible. The 351 priority map is sorted by task weight, but then reordered so once a given 352 .bb file starts to build, it's completed as quickly as possible by 353 running all tasks related to the same .bb file one after the after. 354 This works well where disk space is at a premium and classes like OE's 355 rm_work are in force. 356 """ 357 name = "completion" 358 359 def __init__(self, runqueue, rqdata): 360 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 361 362 # Extract list of tasks for each recipe, with tasks sorted 363 # ascending from "must run first" (typically do_fetch) to 364 # "runs last" (do_build). The speed scheduler prioritizes 365 # tasks that must run first before the ones that run later; 366 # this is what we depend on here. 367 task_lists = {} 368 for taskid in self.prio_map: 369 fn, taskname = taskid.rsplit(':', 1) 370 task_lists.setdefault(fn, []).append(taskname) 371 372 # Now unify the different task lists. The strategy is that 373 # common tasks get skipped and new ones get inserted after the 374 # preceeding common one(s) as they are found. Because task 375 # lists should differ only by their number of tasks, but not 376 # the ordering of the common tasks, this should result in a 377 # deterministic result that is a superset of the individual 378 # task ordering. 379 all_tasks = [] 380 for recipe, new_tasks in task_lists.items(): 381 index = 0 382 old_task = all_tasks[index] if index < len(all_tasks) else None 383 for new_task in new_tasks: 384 if old_task == new_task: 385 # Common task, skip it. This is the fast-path which 386 # avoids a full search. 387 index += 1 388 old_task = all_tasks[index] if index < len(all_tasks) else None 389 else: 390 try: 391 index = all_tasks.index(new_task) 392 # Already present, just not at the current 393 # place. We re-synchronized by changing the 394 # index so that it matches again. Now 395 # move on to the next existing task. 396 index += 1 397 old_task = all_tasks[index] if index < len(all_tasks) else None 398 except ValueError: 399 # Not present. Insert before old_task, which 400 # remains the same (but gets shifted back). 401 all_tasks.insert(index, new_task) 402 index += 1 403 bb.debug(3, 'merged task list: %s' % all_tasks) 404 405 # Now reverse the order so that tasks that finish the work on one 406 # recipe are considered more imporant (= come first). The ordering 407 # is now so that do_build is most important. 408 all_tasks.reverse() 409 410 # Group tasks of the same kind before tasks of less important 411 # kinds at the head of the queue (because earlier = lower 412 # priority number = runs earlier), while preserving the 413 # ordering by recipe. If recipe foo is more important than 414 # bar, then the goal is to work on foo's do_populate_sysroot 415 # before bar's do_populate_sysroot and on the more important 416 # tasks of foo before any of the less important tasks in any 417 # other recipe (if those other recipes are more important than 418 # foo). 419 # 420 # All of this only applies when tasks are runable. Explicit 421 # dependencies still override this ordering by priority. 422 # 423 # Here's an example why this priority re-ordering helps with 424 # minimizing disk usage. Consider a recipe foo with a higher 425 # priority than bar where foo DEPENDS on bar. Then the 426 # implicit rule (from base.bbclass) is that foo's do_configure 427 # depends on bar's do_populate_sysroot. This ensures that 428 # bar's do_populate_sysroot gets done first. Normally the 429 # tasks from foo would continue to run once that is done, and 430 # bar only gets completed and cleaned up later. By ordering 431 # bar's task that depend on bar's do_populate_sysroot before foo's 432 # do_configure, that problem gets avoided. 433 task_index = 0 434 self.dump_prio('original priorities') 435 for task in all_tasks: 436 for index in range(task_index, self.numTasks): 437 taskid = self.prio_map[index] 438 taskname = taskid.rsplit(':', 1)[1] 439 if taskname == task: 440 del self.prio_map[index] 441 self.prio_map.insert(task_index, taskid) 442 task_index += 1 443 self.dump_prio('completion priorities') 444 445class RunTaskEntry(object): 446 def __init__(self): 447 self.depends = set() 448 self.revdeps = set() 449 self.hash = None 450 self.unihash = None 451 self.task = None 452 self.weight = 1 453 454class RunQueueData: 455 """ 456 BitBake Run Queue implementation 457 """ 458 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 459 self.cooker = cooker 460 self.dataCaches = dataCaches 461 self.taskData = taskData 462 self.targets = targets 463 self.rq = rq 464 self.warn_multi_bb = False 465 466 self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() 467 self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) 468 self.setscene_ignore_tasks_checked = False 469 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 470 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 471 472 self.reset() 473 474 def reset(self): 475 self.runtaskentries = {} 476 477 def runq_depends_names(self, ids): 478 import re 479 ret = [] 480 for id in ids: 481 nam = os.path.basename(id) 482 nam = re.sub("_[^,]*,", ",", nam) 483 ret.extend([nam]) 484 return ret 485 486 def get_task_hash(self, tid): 487 return self.runtaskentries[tid].hash 488 489 def get_task_unihash(self, tid): 490 return self.runtaskentries[tid].unihash 491 492 def get_user_idstring(self, tid, task_name_suffix = ""): 493 return tid + task_name_suffix 494 495 def get_short_user_idstring(self, task, task_name_suffix = ""): 496 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 497 pn = self.dataCaches[mc].pkg_fn[taskfn] 498 taskname = taskname_from_tid(task) + task_name_suffix 499 return "%s:%s" % (pn, taskname) 500 501 def circular_depchains_handler(self, tasks): 502 """ 503 Some tasks aren't buildable, likely due to circular dependency issues. 504 Identify the circular dependencies and print them in a user readable format. 505 """ 506 from copy import deepcopy 507 508 valid_chains = [] 509 explored_deps = {} 510 msgs = [] 511 512 class TooManyLoops(Exception): 513 pass 514 515 def chain_reorder(chain): 516 """ 517 Reorder a dependency chain so the lowest task id is first 518 """ 519 lowest = 0 520 new_chain = [] 521 for entry in range(len(chain)): 522 if chain[entry] < chain[lowest]: 523 lowest = entry 524 new_chain.extend(chain[lowest:]) 525 new_chain.extend(chain[:lowest]) 526 return new_chain 527 528 def chain_compare_equal(chain1, chain2): 529 """ 530 Compare two dependency chains and see if they're the same 531 """ 532 if len(chain1) != len(chain2): 533 return False 534 for index in range(len(chain1)): 535 if chain1[index] != chain2[index]: 536 return False 537 return True 538 539 def chain_array_contains(chain, chain_array): 540 """ 541 Return True if chain_array contains chain 542 """ 543 for ch in chain_array: 544 if chain_compare_equal(ch, chain): 545 return True 546 return False 547 548 def find_chains(tid, prev_chain): 549 prev_chain.append(tid) 550 total_deps = [] 551 total_deps.extend(self.runtaskentries[tid].revdeps) 552 for revdep in self.runtaskentries[tid].revdeps: 553 if revdep in prev_chain: 554 idx = prev_chain.index(revdep) 555 # To prevent duplicates, reorder the chain to start with the lowest taskid 556 # and search through an array of those we've already printed 557 chain = prev_chain[idx:] 558 new_chain = chain_reorder(chain) 559 if not chain_array_contains(new_chain, valid_chains): 560 valid_chains.append(new_chain) 561 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 562 for dep in new_chain: 563 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 564 msgs.append("\n") 565 if len(valid_chains) > 10: 566 msgs.append("Halted dependency loops search after 10 matches.\n") 567 raise TooManyLoops 568 continue 569 scan = False 570 if revdep not in explored_deps: 571 scan = True 572 elif revdep in explored_deps[revdep]: 573 scan = True 574 else: 575 for dep in prev_chain: 576 if dep in explored_deps[revdep]: 577 scan = True 578 if scan: 579 find_chains(revdep, copy.deepcopy(prev_chain)) 580 for dep in explored_deps[revdep]: 581 if dep not in total_deps: 582 total_deps.append(dep) 583 584 explored_deps[tid] = total_deps 585 586 try: 587 for task in tasks: 588 find_chains(task, []) 589 except TooManyLoops: 590 pass 591 592 return msgs 593 594 def calculate_task_weights(self, endpoints): 595 """ 596 Calculate a number representing the "weight" of each task. Heavier weighted tasks 597 have more dependencies and hence should be executed sooner for maximum speed. 598 599 This function also sanity checks the task list finding tasks that are not 600 possible to execute due to circular dependencies. 601 """ 602 603 numTasks = len(self.runtaskentries) 604 weight = {} 605 deps_left = {} 606 task_done = {} 607 608 for tid in self.runtaskentries: 609 task_done[tid] = False 610 weight[tid] = 1 611 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 612 613 for tid in endpoints: 614 weight[tid] = 10 615 task_done[tid] = True 616 617 while True: 618 next_points = [] 619 for tid in endpoints: 620 for revdep in self.runtaskentries[tid].depends: 621 weight[revdep] = weight[revdep] + weight[tid] 622 deps_left[revdep] = deps_left[revdep] - 1 623 if deps_left[revdep] == 0: 624 next_points.append(revdep) 625 task_done[revdep] = True 626 endpoints = next_points 627 if not next_points: 628 break 629 630 # Circular dependency sanity check 631 problem_tasks = [] 632 for tid in self.runtaskentries: 633 if task_done[tid] is False or deps_left[tid] != 0: 634 problem_tasks.append(tid) 635 logger.debug2("Task %s is not buildable", tid) 636 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 637 self.runtaskentries[tid].weight = weight[tid] 638 639 if problem_tasks: 640 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 641 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 642 message = message + "Identifying dependency loops (this may take a short while)...\n" 643 logger.error(message) 644 645 msgs = self.circular_depchains_handler(problem_tasks) 646 647 message = "\n" 648 for msg in msgs: 649 message = message + msg 650 bb.msg.fatal("RunQueue", message) 651 652 return weight 653 654 def prepare(self): 655 """ 656 Turn a set of taskData into a RunQueue and compute data needed 657 to optimise the execution order. 658 """ 659 660 runq_build = {} 661 recursivetasks = {} 662 recursiveitasks = {} 663 recursivetasksselfref = set() 664 665 taskData = self.taskData 666 667 found = False 668 for mc in self.taskData: 669 if taskData[mc].taskentries: 670 found = True 671 break 672 if not found: 673 # Nothing to do 674 return 0 675 676 bb.parse.siggen.setup_datacache(self.dataCaches) 677 678 self.init_progress_reporter.start() 679 self.init_progress_reporter.next_stage() 680 bb.event.check_for_interrupts(self.cooker.data) 681 682 # Step A - Work out a list of tasks to run 683 # 684 # Taskdata gives us a list of possible providers for every build and run 685 # target ordered by priority. It also gives information on each of those 686 # providers. 687 # 688 # To create the actual list of tasks to execute we fix the list of 689 # providers and then resolve the dependencies into task IDs. This 690 # process is repeated for each type of dependency (tdepends, deptask, 691 # rdeptast, recrdeptask, idepends). 692 693 def add_build_dependencies(depids, tasknames, depends, mc): 694 for depname in depids: 695 # Won't be in build_targets if ASSUME_PROVIDED 696 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 697 continue 698 depdata = taskData[mc].build_targets[depname][0] 699 if depdata is None: 700 continue 701 for taskname in tasknames: 702 t = depdata + ":" + taskname 703 if t in taskData[mc].taskentries: 704 depends.add(t) 705 706 def add_runtime_dependencies(depids, tasknames, depends, mc): 707 for depname in depids: 708 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 709 continue 710 depdata = taskData[mc].run_targets[depname][0] 711 if depdata is None: 712 continue 713 for taskname in tasknames: 714 t = depdata + ":" + taskname 715 if t in taskData[mc].taskentries: 716 depends.add(t) 717 718 def add_mc_dependencies(mc, tid): 719 mcdeps = taskData[mc].get_mcdepends() 720 for dep in mcdeps: 721 mcdependency = dep.split(':') 722 pn = mcdependency[3] 723 frommc = mcdependency[1] 724 mcdep = mcdependency[2] 725 deptask = mcdependency[4] 726 if mcdep not in taskData: 727 bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep)) 728 if mc == frommc: 729 fn = taskData[mcdep].build_targets[pn][0] 730 newdep = '%s:%s' % (fn,deptask) 731 taskData[mc].taskentries[tid].tdepends.append(newdep) 732 733 for mc in taskData: 734 for tid in taskData[mc].taskentries: 735 736 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 737 #runtid = build_tid(mc, fn, taskname) 738 739 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 740 741 depends = set() 742 task_deps = self.dataCaches[mc].task_deps[taskfn] 743 744 self.runtaskentries[tid] = RunTaskEntry() 745 746 if fn in taskData[mc].failed_fns: 747 continue 748 749 # We add multiconfig dependencies before processing internal task deps (tdepends) 750 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 751 add_mc_dependencies(mc, tid) 752 753 # Resolve task internal dependencies 754 # 755 # e.g. addtask before X after Y 756 for t in taskData[mc].taskentries[tid].tdepends: 757 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 758 depends.add(build_tid(depmc, depfn, deptaskname)) 759 760 # Resolve 'deptask' dependencies 761 # 762 # e.g. do_sometask[deptask] = "do_someothertask" 763 # (makes sure sometask runs after someothertask of all DEPENDS) 764 if 'deptask' in task_deps and taskname in task_deps['deptask']: 765 tasknames = task_deps['deptask'][taskname].split() 766 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 767 768 # Resolve 'rdeptask' dependencies 769 # 770 # e.g. do_sometask[rdeptask] = "do_someothertask" 771 # (makes sure sometask runs after someothertask of all RDEPENDS) 772 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 773 tasknames = task_deps['rdeptask'][taskname].split() 774 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 775 776 # Resolve inter-task dependencies 777 # 778 # e.g. do_sometask[depends] = "targetname:do_someothertask" 779 # (makes sure sometask runs after targetname's someothertask) 780 idepends = taskData[mc].taskentries[tid].idepends 781 for (depname, idependtask) in idepends: 782 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 783 # Won't be in build_targets if ASSUME_PROVIDED 784 depdata = taskData[mc].build_targets[depname][0] 785 if depdata is not None: 786 t = depdata + ":" + idependtask 787 depends.add(t) 788 if t not in taskData[mc].taskentries: 789 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 790 irdepends = taskData[mc].taskentries[tid].irdepends 791 for (depname, idependtask) in irdepends: 792 if depname in taskData[mc].run_targets: 793 # Won't be in run_targets if ASSUME_PROVIDED 794 if not taskData[mc].run_targets[depname]: 795 continue 796 depdata = taskData[mc].run_targets[depname][0] 797 if depdata is not None: 798 t = depdata + ":" + idependtask 799 depends.add(t) 800 if t not in taskData[mc].taskentries: 801 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 802 803 # Resolve recursive 'recrdeptask' dependencies (Part A) 804 # 805 # e.g. do_sometask[recrdeptask] = "do_someothertask" 806 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 807 # We cover the recursive part of the dependencies below 808 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 809 tasknames = task_deps['recrdeptask'][taskname].split() 810 recursivetasks[tid] = tasknames 811 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 812 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 813 if taskname in tasknames: 814 recursivetasksselfref.add(tid) 815 816 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 817 recursiveitasks[tid] = [] 818 for t in task_deps['recideptask'][taskname].split(): 819 newdep = build_tid(mc, fn, t) 820 recursiveitasks[tid].append(newdep) 821 822 self.runtaskentries[tid].depends = depends 823 # Remove all self references 824 self.runtaskentries[tid].depends.discard(tid) 825 826 #self.dump_data() 827 828 self.init_progress_reporter.next_stage() 829 bb.event.check_for_interrupts(self.cooker.data) 830 831 # Resolve recursive 'recrdeptask' dependencies (Part B) 832 # 833 # e.g. do_sometask[recrdeptask] = "do_someothertask" 834 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 835 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 836 837 # Generating/interating recursive lists of dependencies is painful and potentially slow 838 # Precompute recursive task dependencies here by: 839 # a) create a temp list of reverse dependencies (revdeps) 840 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 841 # c) combine the total list of dependencies in cumulativedeps 842 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 843 844 845 revdeps = {} 846 deps = {} 847 cumulativedeps = {} 848 for tid in self.runtaskentries: 849 deps[tid] = set(self.runtaskentries[tid].depends) 850 revdeps[tid] = set() 851 cumulativedeps[tid] = set() 852 # Generate a temp list of reverse dependencies 853 for tid in self.runtaskentries: 854 for dep in self.runtaskentries[tid].depends: 855 revdeps[dep].add(tid) 856 # Find the dependency chain endpoints 857 endpoints = set() 858 for tid in self.runtaskentries: 859 if not deps[tid]: 860 endpoints.add(tid) 861 # Iterate the chains collating dependencies 862 while endpoints: 863 next = set() 864 for tid in endpoints: 865 for dep in revdeps[tid]: 866 cumulativedeps[dep].add(fn_from_tid(tid)) 867 cumulativedeps[dep].update(cumulativedeps[tid]) 868 if tid in deps[dep]: 869 deps[dep].remove(tid) 870 if not deps[dep]: 871 next.add(dep) 872 endpoints = next 873 #for tid in deps: 874 # if deps[tid]: 875 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 876 877 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 878 # resolve these recursively until we aren't adding any further extra dependencies 879 extradeps = True 880 while extradeps: 881 extradeps = 0 882 for tid in recursivetasks: 883 tasknames = recursivetasks[tid] 884 885 totaldeps = set(self.runtaskentries[tid].depends) 886 if tid in recursiveitasks: 887 totaldeps.update(recursiveitasks[tid]) 888 for dep in recursiveitasks[tid]: 889 if dep not in self.runtaskentries: 890 continue 891 totaldeps.update(self.runtaskentries[dep].depends) 892 893 deps = set() 894 for dep in totaldeps: 895 if dep in cumulativedeps: 896 deps.update(cumulativedeps[dep]) 897 898 for t in deps: 899 for taskname in tasknames: 900 newtid = t + ":" + taskname 901 if newtid == tid: 902 continue 903 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 904 extradeps += 1 905 self.runtaskentries[tid].depends.add(newtid) 906 907 # Handle recursive tasks which depend upon other recursive tasks 908 deps = set() 909 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 910 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 911 for newtid in deps: 912 for taskname in tasknames: 913 if not newtid.endswith(":" + taskname): 914 continue 915 if newtid in self.runtaskentries: 916 extradeps += 1 917 self.runtaskentries[tid].depends.add(newtid) 918 919 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 920 921 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 922 for tid in recursivetasksselfref: 923 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 924 925 self.init_progress_reporter.next_stage() 926 bb.event.check_for_interrupts(self.cooker.data) 927 928 #self.dump_data() 929 930 # Step B - Mark all active tasks 931 # 932 # Start with the tasks we were asked to run and mark all dependencies 933 # as active too. If the task is to be 'forced', clear its stamp. Once 934 # all active tasks are marked, prune the ones we don't need. 935 936 logger.verbose("Marking Active Tasks") 937 938 def mark_active(tid, depth): 939 """ 940 Mark an item as active along with its depends 941 (calls itself recursively) 942 """ 943 944 if tid in runq_build: 945 return 946 947 runq_build[tid] = 1 948 949 depends = self.runtaskentries[tid].depends 950 for depend in depends: 951 mark_active(depend, depth+1) 952 953 def invalidate_task(tid, error_nostamp): 954 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 955 taskdep = self.dataCaches[mc].task_deps[taskfn] 956 if fn + ":" + taskname not in taskData[mc].taskentries: 957 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 958 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 959 if error_nostamp: 960 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 961 else: 962 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 963 else: 964 logger.verbose("Invalidate task %s, %s", taskname, fn) 965 bb.parse.siggen.invalidate_task(taskname, taskfn) 966 967 self.target_tids = [] 968 for (mc, target, task, fn) in self.targets: 969 970 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 971 continue 972 973 if target in taskData[mc].failed_deps: 974 continue 975 976 parents = False 977 if task.endswith('-'): 978 parents = True 979 task = task[:-1] 980 981 if fn in taskData[mc].failed_fns: 982 continue 983 984 # fn already has mc prefix 985 tid = fn + ":" + task 986 self.target_tids.append(tid) 987 if tid not in taskData[mc].taskentries: 988 import difflib 989 tasks = [] 990 for x in taskData[mc].taskentries: 991 if x.startswith(fn + ":"): 992 tasks.append(taskname_from_tid(x)) 993 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 994 if close_matches: 995 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 996 else: 997 extra = "" 998 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 999 1000 # For tasks called "XXXX-", ony run their dependencies 1001 if parents: 1002 for i in self.runtaskentries[tid].depends: 1003 mark_active(i, 1) 1004 else: 1005 mark_active(tid, 1) 1006 1007 self.init_progress_reporter.next_stage() 1008 bb.event.check_for_interrupts(self.cooker.data) 1009 1010 # Step C - Prune all inactive tasks 1011 # 1012 # Once all active tasks are marked, prune the ones we don't need. 1013 1014 # Handle --runall 1015 if self.cooker.configuration.runall: 1016 # re-run the mark_active and then drop unused tasks from new list 1017 1018 runall_tids = set() 1019 added = True 1020 while added: 1021 reduced_tasklist = set(self.runtaskentries.keys()) 1022 for tid in list(self.runtaskentries.keys()): 1023 if tid not in runq_build: 1024 reduced_tasklist.remove(tid) 1025 runq_build = {} 1026 1027 orig = runall_tids 1028 runall_tids = set() 1029 for task in self.cooker.configuration.runall: 1030 if not task.startswith("do_"): 1031 task = "do_{0}".format(task) 1032 for tid in reduced_tasklist: 1033 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 1034 if wanttid in self.runtaskentries: 1035 runall_tids.add(wanttid) 1036 1037 for tid in list(runall_tids): 1038 mark_active(tid, 1) 1039 self.target_tids.append(tid) 1040 if self.cooker.configuration.force: 1041 invalidate_task(tid, False) 1042 added = runall_tids - orig 1043 1044 delcount = set() 1045 for tid in list(self.runtaskentries.keys()): 1046 if tid not in runq_build: 1047 delcount.add(tid) 1048 del self.runtaskentries[tid] 1049 1050 if self.cooker.configuration.runall: 1051 if not self.runtaskentries: 1052 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 1053 1054 self.init_progress_reporter.next_stage() 1055 bb.event.check_for_interrupts(self.cooker.data) 1056 1057 # Handle runonly 1058 if self.cooker.configuration.runonly: 1059 # re-run the mark_active and then drop unused tasks from new list 1060 runq_build = {} 1061 1062 for task in self.cooker.configuration.runonly: 1063 if not task.startswith("do_"): 1064 task = "do_{0}".format(task) 1065 runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] 1066 1067 for tid in runonly_tids: 1068 mark_active(tid, 1) 1069 if self.cooker.configuration.force: 1070 invalidate_task(tid, False) 1071 1072 for tid in list(self.runtaskentries.keys()): 1073 if tid not in runq_build: 1074 delcount.add(tid) 1075 del self.runtaskentries[tid] 1076 1077 if not self.runtaskentries: 1078 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 1079 1080 # 1081 # Step D - Sanity checks and computation 1082 # 1083 1084 # Check to make sure we still have tasks to run 1085 if not self.runtaskentries: 1086 if not taskData[''].halt: 1087 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 1088 else: 1089 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 1090 1091 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 1092 1093 logger.verbose("Assign Weightings") 1094 1095 self.init_progress_reporter.next_stage() 1096 bb.event.check_for_interrupts(self.cooker.data) 1097 1098 # Generate a list of reverse dependencies to ease future calculations 1099 for tid in self.runtaskentries: 1100 for dep in self.runtaskentries[tid].depends: 1101 self.runtaskentries[dep].revdeps.add(tid) 1102 1103 self.init_progress_reporter.next_stage() 1104 bb.event.check_for_interrupts(self.cooker.data) 1105 1106 # Identify tasks at the end of dependency chains 1107 # Error on circular dependency loops (length two) 1108 endpoints = [] 1109 for tid in self.runtaskentries: 1110 revdeps = self.runtaskentries[tid].revdeps 1111 if not revdeps: 1112 endpoints.append(tid) 1113 for dep in revdeps: 1114 if dep in self.runtaskentries[tid].depends: 1115 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1116 1117 1118 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1119 1120 self.init_progress_reporter.next_stage() 1121 bb.event.check_for_interrupts(self.cooker.data) 1122 1123 # Calculate task weights 1124 # Check of higher length circular dependencies 1125 self.runq_weight = self.calculate_task_weights(endpoints) 1126 1127 self.init_progress_reporter.next_stage() 1128 bb.event.check_for_interrupts(self.cooker.data) 1129 1130 # Sanity Check - Check for multiple tasks building the same provider 1131 for mc in self.dataCaches: 1132 prov_list = {} 1133 seen_fn = [] 1134 for tid in self.runtaskentries: 1135 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1136 if taskfn in seen_fn: 1137 continue 1138 if mc != tidmc: 1139 continue 1140 seen_fn.append(taskfn) 1141 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1142 if prov not in prov_list: 1143 prov_list[prov] = [taskfn] 1144 elif taskfn not in prov_list[prov]: 1145 prov_list[prov].append(taskfn) 1146 for prov in prov_list: 1147 if len(prov_list[prov]) < 2: 1148 continue 1149 if prov in self.multi_provider_allowed: 1150 continue 1151 seen_pn = [] 1152 # If two versions of the same PN are being built its fatal, we don't support it. 1153 for fn in prov_list[prov]: 1154 pn = self.dataCaches[mc].pkg_fn[fn] 1155 if pn not in seen_pn: 1156 seen_pn.append(pn) 1157 else: 1158 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1159 msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] 1160 # 1161 # Construct a list of things which uniquely depend on each provider 1162 # since this may help the user figure out which dependency is triggering this warning 1163 # 1164 msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") 1165 deplist = {} 1166 commondeps = None 1167 for provfn in prov_list[prov]: 1168 deps = set() 1169 for tid in self.runtaskentries: 1170 fn = fn_from_tid(tid) 1171 if fn != provfn: 1172 continue 1173 for dep in self.runtaskentries[tid].revdeps: 1174 fn = fn_from_tid(dep) 1175 if fn == provfn: 1176 continue 1177 deps.add(dep) 1178 if not commondeps: 1179 commondeps = set(deps) 1180 else: 1181 commondeps &= deps 1182 deplist[provfn] = deps 1183 for provfn in deplist: 1184 msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) 1185 # 1186 # Construct a list of provides and runtime providers for each recipe 1187 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1188 # 1189 msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") 1190 provide_results = {} 1191 rprovide_results = {} 1192 commonprovs = None 1193 commonrprovs = None 1194 for provfn in prov_list[prov]: 1195 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1196 rprovides = set() 1197 for rprovide in self.dataCaches[mc].rproviders: 1198 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1199 rprovides.add(rprovide) 1200 for package in self.dataCaches[mc].packages: 1201 if provfn in self.dataCaches[mc].packages[package]: 1202 rprovides.add(package) 1203 for package in self.dataCaches[mc].packages_dynamic: 1204 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1205 rprovides.add(package) 1206 if not commonprovs: 1207 commonprovs = set(provides) 1208 else: 1209 commonprovs &= provides 1210 provide_results[provfn] = provides 1211 if not commonrprovs: 1212 commonrprovs = set(rprovides) 1213 else: 1214 commonrprovs &= rprovides 1215 rprovide_results[provfn] = rprovides 1216 #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) 1217 #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) 1218 for provfn in prov_list[prov]: 1219 msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) 1220 msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) 1221 1222 if self.warn_multi_bb: 1223 logger.verbnote("".join(msgs)) 1224 else: 1225 logger.error("".join(msgs)) 1226 1227 self.init_progress_reporter.next_stage() 1228 self.init_progress_reporter.next_stage() 1229 bb.event.check_for_interrupts(self.cooker.data) 1230 1231 # Iterate over the task list looking for tasks with a 'setscene' function 1232 self.runq_setscene_tids = set() 1233 if not self.cooker.configuration.nosetscene: 1234 for tid in self.runtaskentries: 1235 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1236 setscenetid = tid + "_setscene" 1237 if setscenetid not in taskData[mc].taskentries: 1238 continue 1239 self.runq_setscene_tids.add(tid) 1240 1241 self.init_progress_reporter.next_stage() 1242 bb.event.check_for_interrupts(self.cooker.data) 1243 1244 # Invalidate task if force mode active 1245 if self.cooker.configuration.force: 1246 for tid in self.target_tids: 1247 invalidate_task(tid, False) 1248 1249 # Invalidate task if invalidate mode active 1250 if self.cooker.configuration.invalidate_stamp: 1251 for tid in self.target_tids: 1252 fn = fn_from_tid(tid) 1253 for st in self.cooker.configuration.invalidate_stamp.split(','): 1254 if not st.startswith("do_"): 1255 st = "do_%s" % st 1256 invalidate_task(fn + ":" + st, True) 1257 1258 self.init_progress_reporter.next_stage() 1259 bb.event.check_for_interrupts(self.cooker.data) 1260 1261 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1262 for mc in taskData: 1263 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1264 virtpnmap = {} 1265 for v in virtmap: 1266 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1267 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1268 if hasattr(bb.parse.siggen, "tasks_resolved"): 1269 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1270 1271 self.init_progress_reporter.next_stage() 1272 bb.event.check_for_interrupts(self.cooker.data) 1273 1274 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1275 1276 # Iterate over the task list and call into the siggen code 1277 dealtwith = set() 1278 todeal = set(self.runtaskentries) 1279 while todeal: 1280 for tid in todeal.copy(): 1281 if not (self.runtaskentries[tid].depends - dealtwith): 1282 dealtwith.add(tid) 1283 todeal.remove(tid) 1284 self.prepare_task_hash(tid) 1285 bb.event.check_for_interrupts(self.cooker.data) 1286 1287 bb.parse.siggen.writeout_file_checksum_cache() 1288 1289 #self.dump_data() 1290 return len(self.runtaskentries) 1291 1292 def prepare_task_hash(self, tid): 1293 bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1294 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1295 self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 1296 1297 def dump_data(self): 1298 """ 1299 Dump some debug information on the internal data structures 1300 """ 1301 logger.debug3("run_tasks:") 1302 for tid in self.runtaskentries: 1303 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1304 self.runtaskentries[tid].weight, 1305 self.runtaskentries[tid].depends, 1306 self.runtaskentries[tid].revdeps) 1307 1308class RunQueueWorker(): 1309 def __init__(self, process, pipe): 1310 self.process = process 1311 self.pipe = pipe 1312 1313class RunQueue: 1314 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1315 1316 self.cooker = cooker 1317 self.cfgData = cfgData 1318 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1319 1320 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1321 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1322 1323 self.state = runQueuePrepare 1324 1325 # For disk space monitor 1326 # Invoked at regular time intervals via the bitbake heartbeat event 1327 # while the build is running. We generate a unique name for the handler 1328 # here, just in case that there ever is more than one RunQueue instance, 1329 # start the handler when reaching runQueueSceneInit, and stop it when 1330 # done with the build. 1331 self.dm = monitordisk.diskMonitor(cfgData) 1332 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1333 self.dm_event_handler_registered = False 1334 self.rqexe = None 1335 self.worker = {} 1336 self.fakeworker = {} 1337 1338 @staticmethod 1339 def send_pickled_data(worker, data, name): 1340 msg = bytearray() 1341 msg.extend(b"<" + name.encode() + b">") 1342 pickled_data = pickle.dumps(data) 1343 msg.extend(len(pickled_data).to_bytes(4, 'big')) 1344 msg.extend(pickled_data) 1345 msg.extend(b"</" + name.encode() + b">") 1346 worker.stdin.write(msg) 1347 1348 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1349 logger.debug("Starting bitbake-worker") 1350 magic = "decafbad" 1351 if self.cooker.configuration.profile: 1352 magic = "decafbadbad" 1353 fakerootlogs = None 1354 1355 workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker") 1356 if fakeroot: 1357 magic = magic + "beef" 1358 mcdata = self.cooker.databuilder.mcdata[mc] 1359 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1360 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1361 env = os.environ.copy() 1362 for key, value in (var.split('=',1) for var in fakerootenv): 1363 env[key] = value 1364 worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1365 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1366 else: 1367 worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1368 bb.utils.nonblockingfd(worker.stdout) 1369 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1370 1371 workerdata = { 1372 "sigdata" : bb.parse.siggen.get_taskdata(), 1373 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1374 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1375 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1376 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1377 "prhost" : self.cooker.prhost, 1378 "buildname" : self.cfgData.getVar("BUILDNAME"), 1379 "date" : self.cfgData.getVar("DATE"), 1380 "time" : self.cfgData.getVar("TIME"), 1381 "hashservaddr" : self.cooker.hashservaddr, 1382 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1383 } 1384 1385 RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") 1386 RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") 1387 RunQueue.send_pickled_data(worker, workerdata, "workerdata") 1388 worker.stdin.flush() 1389 1390 return RunQueueWorker(worker, workerpipe) 1391 1392 def _teardown_worker(self, worker): 1393 if not worker: 1394 return 1395 logger.debug("Teardown for bitbake-worker") 1396 try: 1397 RunQueue.send_pickled_data(worker.process, b"", "quit") 1398 worker.process.stdin.flush() 1399 worker.process.stdin.close() 1400 except IOError: 1401 pass 1402 while worker.process.returncode is None: 1403 worker.pipe.read() 1404 worker.process.poll() 1405 while worker.pipe.read(): 1406 continue 1407 worker.pipe.close() 1408 1409 def start_worker(self, rqexec): 1410 if self.worker: 1411 self.teardown_workers() 1412 self.teardown = False 1413 for mc in self.rqdata.dataCaches: 1414 self.worker[mc] = self._start_worker(mc, False, rqexec) 1415 1416 def start_fakeworker(self, rqexec, mc): 1417 if not mc in self.fakeworker: 1418 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1419 1420 def teardown_workers(self): 1421 self.teardown = True 1422 for mc in self.worker: 1423 self._teardown_worker(self.worker[mc]) 1424 self.worker = {} 1425 for mc in self.fakeworker: 1426 self._teardown_worker(self.fakeworker[mc]) 1427 self.fakeworker = {} 1428 1429 def read_workers(self): 1430 for mc in self.worker: 1431 self.worker[mc].pipe.read() 1432 for mc in self.fakeworker: 1433 self.fakeworker[mc].pipe.read() 1434 1435 def active_fds(self): 1436 fds = [] 1437 for mc in self.worker: 1438 fds.append(self.worker[mc].pipe.input) 1439 for mc in self.fakeworker: 1440 fds.append(self.fakeworker[mc].pipe.input) 1441 return fds 1442 1443 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1444 def get_timestamp(f): 1445 try: 1446 if not os.access(f, os.F_OK): 1447 return None 1448 return os.stat(f)[stat.ST_MTIME] 1449 except: 1450 return None 1451 1452 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1453 if taskname is None: 1454 taskname = tn 1455 1456 stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn) 1457 1458 # If the stamp is missing, it's not current 1459 if not os.access(stampfile, os.F_OK): 1460 logger.debug2("Stampfile %s not available", stampfile) 1461 return False 1462 # If it's a 'nostamp' task, it's not current 1463 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1464 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1465 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1466 return False 1467 1468 if taskname.endswith("_setscene"): 1469 return True 1470 1471 if cache is None: 1472 cache = {} 1473 1474 iscurrent = True 1475 t1 = get_timestamp(stampfile) 1476 for dep in self.rqdata.runtaskentries[tid].depends: 1477 if iscurrent: 1478 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1479 stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2) 1480 stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2) 1481 t2 = get_timestamp(stampfile2) 1482 t3 = get_timestamp(stampfile3) 1483 if t3 and not t2: 1484 continue 1485 if t3 and t3 > t2: 1486 continue 1487 if fn == fn2: 1488 if not t2: 1489 logger.debug2('Stampfile %s does not exist', stampfile2) 1490 iscurrent = False 1491 break 1492 if t1 < t2: 1493 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1494 iscurrent = False 1495 break 1496 if recurse and iscurrent: 1497 if dep in cache: 1498 iscurrent = cache[dep] 1499 if not iscurrent: 1500 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1501 else: 1502 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1503 cache[dep] = iscurrent 1504 if recurse: 1505 cache[tid] = iscurrent 1506 return iscurrent 1507 1508 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1509 valid = set() 1510 if self.hashvalidate: 1511 sq_data = {} 1512 sq_data['hash'] = {} 1513 sq_data['hashfn'] = {} 1514 sq_data['unihash'] = {} 1515 for tid in tocheck: 1516 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1517 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1518 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1519 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1520 1521 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1522 1523 return valid 1524 1525 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1526 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1527 1528 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1529 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1530 1531 return bb.utils.better_eval(call, locs) 1532 1533 def _execute_runqueue(self): 1534 """ 1535 Run the tasks in a queue prepared by rqdata.prepare() 1536 Upon failure, optionally try to recover the build using any alternate providers 1537 (if the halt on failure configuration option isn't set) 1538 """ 1539 1540 retval = True 1541 bb.event.check_for_interrupts(self.cooker.data) 1542 1543 if self.state is runQueuePrepare: 1544 # NOTE: if you add, remove or significantly refactor the stages of this 1545 # process then you should recalculate the weightings here. This is quite 1546 # easy to do - just change the next line temporarily to pass debug=True as 1547 # the last parameter and you'll get a printout of the weightings as well 1548 # as a map to the lines where next_stage() was called. Of course this isn't 1549 # critical, but it helps to keep the progress reporting accurate. 1550 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1551 "Initialising tasks", 1552 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1553 if self.rqdata.prepare() == 0: 1554 self.state = runQueueComplete 1555 else: 1556 self.state = runQueueSceneInit 1557 bb.parse.siggen.save_unitaskhashes() 1558 1559 if self.state is runQueueSceneInit: 1560 self.rqdata.init_progress_reporter.next_stage() 1561 1562 # we are ready to run, emit dependency info to any UI or class which 1563 # needs it 1564 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1565 self.rqdata.init_progress_reporter.next_stage() 1566 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1567 1568 if not self.dm_event_handler_registered: 1569 res = bb.event.register(self.dm_event_handler_name, 1570 lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1571 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1572 self.dm_event_handler_registered = True 1573 1574 self.rqdata.init_progress_reporter.next_stage() 1575 self.rqexe = RunQueueExecute(self) 1576 1577 dump = self.cooker.configuration.dump_signatures 1578 if dump: 1579 self.rqdata.init_progress_reporter.finish() 1580 if 'printdiff' in dump: 1581 invalidtasks = self.print_diffscenetasks() 1582 self.dump_signatures(dump) 1583 if 'printdiff' in dump: 1584 self.write_diffscenetasks(invalidtasks) 1585 self.state = runQueueComplete 1586 1587 if self.state is runQueueSceneInit: 1588 self.start_worker(self.rqexe) 1589 self.rqdata.init_progress_reporter.finish() 1590 1591 # If we don't have any setscene functions, skip execution 1592 if not self.rqdata.runq_setscene_tids: 1593 logger.info('No setscene tasks') 1594 for tid in self.rqdata.runtaskentries: 1595 if not self.rqdata.runtaskentries[tid].depends: 1596 self.rqexe.setbuildable(tid) 1597 self.rqexe.tasks_notcovered.add(tid) 1598 self.rqexe.sqdone = True 1599 logger.info('Executing Tasks') 1600 self.state = runQueueRunning 1601 1602 if self.state is runQueueRunning: 1603 retval = self.rqexe.execute() 1604 1605 if self.state is runQueueCleanUp: 1606 retval = self.rqexe.finish() 1607 1608 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1609 1610 if build_done and self.dm_event_handler_registered: 1611 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1612 self.dm_event_handler_registered = False 1613 1614 if build_done and self.rqexe: 1615 bb.parse.siggen.save_unitaskhashes() 1616 self.teardown_workers() 1617 if self.rqexe: 1618 if self.rqexe.stats.failed: 1619 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1620 else: 1621 # Let's avoid the word "failed" if nothing actually did 1622 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1623 1624 if self.state is runQueueFailed: 1625 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1626 1627 if self.state is runQueueComplete: 1628 # All done 1629 return False 1630 1631 # Loop 1632 return retval 1633 1634 def execute_runqueue(self): 1635 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1636 try: 1637 return self._execute_runqueue() 1638 except bb.runqueue.TaskFailure: 1639 raise 1640 except SystemExit: 1641 raise 1642 except bb.BBHandledException: 1643 try: 1644 self.teardown_workers() 1645 except: 1646 pass 1647 self.state = runQueueComplete 1648 raise 1649 except Exception as err: 1650 logger.exception("An uncaught exception occurred in runqueue") 1651 try: 1652 self.teardown_workers() 1653 except: 1654 pass 1655 self.state = runQueueComplete 1656 raise 1657 1658 def finish_runqueue(self, now = False): 1659 if not self.rqexe: 1660 self.state = runQueueComplete 1661 return 1662 1663 if now: 1664 self.rqexe.finish_now() 1665 else: 1666 self.rqexe.finish() 1667 1668 def _rq_dump_sigtid(self, tids): 1669 for tid in tids: 1670 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1671 dataCaches = self.rqdata.dataCaches 1672 bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True) 1673 1674 def dump_signatures(self, options): 1675 if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset: 1676 bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled") 1677 1678 bb.note("Writing task signature files") 1679 1680 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1681 def chunkify(l, n): 1682 return [l[i::n] for i in range(n)] 1683 tids = chunkify(list(self.rqdata.runtaskentries), max_process) 1684 # We cannot use the real multiprocessing.Pool easily due to some local data 1685 # that can't be pickled. This is a cheap multi-process solution. 1686 launched = [] 1687 while tids: 1688 if len(launched) < max_process: 1689 p = Process(target=self._rq_dump_sigtid, args=(tids.pop(), )) 1690 p.start() 1691 launched.append(p) 1692 for q in launched: 1693 # The finished processes are joined when calling is_alive() 1694 if not q.is_alive(): 1695 launched.remove(q) 1696 for p in launched: 1697 p.join() 1698 1699 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1700 1701 return 1702 1703 def print_diffscenetasks(self): 1704 def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid): 1705 invalidtasks = [] 1706 for t in taskdepends[task].depends: 1707 if t not in valid and t not in visited_invalid: 1708 invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid)) 1709 visited_invalid.add(t) 1710 1711 direct_invalid = [t for t in taskdepends[task].depends if t not in valid] 1712 if not direct_invalid and task not in noexec: 1713 invalidtasks = [task] 1714 return invalidtasks 1715 1716 noexec = [] 1717 tocheck = set() 1718 1719 for tid in self.rqdata.runtaskentries: 1720 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1721 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1722 1723 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1724 noexec.append(tid) 1725 continue 1726 1727 tocheck.add(tid) 1728 1729 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1730 1731 # Tasks which are both setscene and noexec never care about dependencies 1732 # We therefore find tasks which are setscene and noexec and mark their 1733 # unique dependencies as valid. 1734 for tid in noexec: 1735 if tid not in self.rqdata.runq_setscene_tids: 1736 continue 1737 for dep in self.rqdata.runtaskentries[tid].depends: 1738 hasnoexecparents = True 1739 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1740 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1741 continue 1742 hasnoexecparents = False 1743 break 1744 if hasnoexecparents: 1745 valid_new.add(dep) 1746 1747 invalidtasks = set() 1748 1749 toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets]) 1750 for tid in toptasks: 1751 toprocess = set([tid]) 1752 while toprocess: 1753 next = set() 1754 visited_invalid = set() 1755 for t in toprocess: 1756 if t not in valid_new and t not in noexec: 1757 invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid)) 1758 continue 1759 if t in self.rqdata.runq_setscene_tids: 1760 for dep in self.rqexe.sqdata.sq_deps[t]: 1761 next.add(dep) 1762 continue 1763 1764 for dep in self.rqdata.runtaskentries[t].depends: 1765 next.add(dep) 1766 1767 toprocess = next 1768 1769 tasklist = [] 1770 for tid in invalidtasks: 1771 tasklist.append(tid) 1772 1773 if tasklist: 1774 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1775 1776 return invalidtasks 1777 1778 def write_diffscenetasks(self, invalidtasks): 1779 bb.siggen.check_siggen_version(bb.siggen) 1780 1781 # Define recursion callback 1782 def recursecb(key, hash1, hash2): 1783 hashes = [hash1, hash2] 1784 bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes)) 1785 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1786 bb.debug(1, "Found hashfiles:\n{}".format(hashfiles)) 1787 1788 recout = [] 1789 if len(hashfiles) == 2: 1790 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb) 1791 recout.extend(list(' ' + l for l in out2)) 1792 else: 1793 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1794 1795 return recout 1796 1797 1798 for tid in invalidtasks: 1799 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1800 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1801 h = self.rqdata.runtaskentries[tid].unihash 1802 bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname)) 1803 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) 1804 bb.debug(1, "Found hashfiles:\n{}".format(matches)) 1805 match = None 1806 for m in matches.values(): 1807 if h in m['path']: 1808 match = m['path'] 1809 if match is None: 1810 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid)) 1811 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1812 matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']} 1813 if matches_local: 1814 matches = matches_local 1815 if matches: 1816 latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path'] 1817 prevh = __find_sha256__.search(latestmatch).group(0) 1818 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1819 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1820 1821 1822class RunQueueExecute: 1823 1824 def __init__(self, rq): 1825 self.rq = rq 1826 self.cooker = rq.cooker 1827 self.cfgData = rq.cfgData 1828 self.rqdata = rq.rqdata 1829 1830 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1831 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1832 self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") 1833 self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") 1834 self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") 1835 self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX") 1836 1837 self.sq_buildable = set() 1838 self.sq_running = set() 1839 self.sq_live = set() 1840 1841 self.updated_taskhash_queue = [] 1842 self.pending_migrations = set() 1843 1844 self.runq_buildable = set() 1845 self.runq_running = set() 1846 self.runq_complete = set() 1847 self.runq_tasksrun = set() 1848 1849 self.build_stamps = {} 1850 self.build_stamps2 = [] 1851 self.failed_tids = [] 1852 self.sq_deferred = {} 1853 self.sq_needed_harddeps = set() 1854 self.sq_harddep_deferred = set() 1855 1856 self.stampcache = {} 1857 1858 self.holdoff_tasks = set() 1859 self.holdoff_need_update = True 1860 self.sqdone = False 1861 1862 self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) 1863 1864 if self.number_tasks <= 0: 1865 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1866 1867 lower_limit = 1.0 1868 upper_limit = 1000000.0 1869 if self.max_cpu_pressure: 1870 self.max_cpu_pressure = float(self.max_cpu_pressure) 1871 if self.max_cpu_pressure < lower_limit: 1872 bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) 1873 if self.max_cpu_pressure > upper_limit: 1874 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) 1875 1876 if self.max_io_pressure: 1877 self.max_io_pressure = float(self.max_io_pressure) 1878 if self.max_io_pressure < lower_limit: 1879 bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) 1880 if self.max_io_pressure > upper_limit: 1881 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1882 1883 if self.max_memory_pressure: 1884 self.max_memory_pressure = float(self.max_memory_pressure) 1885 if self.max_memory_pressure < lower_limit: 1886 bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) 1887 if self.max_memory_pressure > upper_limit: 1888 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1889 1890 if self.max_loadfactor: 1891 self.max_loadfactor = float(self.max_loadfactor) 1892 if self.max_loadfactor <= 0: 1893 bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor)) 1894 1895 # List of setscene tasks which we've covered 1896 self.scenequeue_covered = set() 1897 # List of tasks which are covered (including setscene ones) 1898 self.tasks_covered = set() 1899 self.tasks_scenequeue_done = set() 1900 self.scenequeue_notcovered = set() 1901 self.tasks_notcovered = set() 1902 self.scenequeue_notneeded = set() 1903 1904 schedulers = self.get_schedulers() 1905 for scheduler in schedulers: 1906 if self.scheduler == scheduler.name: 1907 self.sched = scheduler(self, self.rqdata) 1908 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1909 break 1910 else: 1911 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1912 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1913 1914 #if self.rqdata.runq_setscene_tids: 1915 self.sqdata = SQData() 1916 build_scenequeue_data(self.sqdata, self.rqdata, self) 1917 1918 update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True) 1919 1920 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 1921 # in any stamp files. Pass the list out to metadata as an event. 1922 found = {} 1923 for tid in self.rqdata.runq_setscene_tids: 1924 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1925 stamps = bb.build.find_stale_stamps(taskname, taskfn) 1926 if stamps: 1927 if mc not in found: 1928 found[mc] = {} 1929 found[mc][tid] = stamps 1930 for mc in found: 1931 event = bb.event.StaleSetSceneTasks(found[mc]) 1932 bb.event.fire(event, self.cooker.databuilder.mcdata[mc]) 1933 1934 self.build_taskdepdata_cache() 1935 1936 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1937 1938 # self.build_stamps[pid] may not exist when use shared work directory. 1939 if task in self.build_stamps: 1940 self.build_stamps2.remove(self.build_stamps[task]) 1941 del self.build_stamps[task] 1942 1943 if task in self.sq_live: 1944 if status != 0: 1945 self.sq_task_fail(task, status) 1946 else: 1947 self.sq_task_complete(task) 1948 self.sq_live.remove(task) 1949 self.stats.updateActiveSetscene(len(self.sq_live)) 1950 else: 1951 if status != 0: 1952 self.task_fail(task, status, fakerootlog=fakerootlog) 1953 else: 1954 self.task_complete(task) 1955 return True 1956 1957 def finish_now(self): 1958 for mc in self.rq.worker: 1959 try: 1960 RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") 1961 self.rq.worker[mc].process.stdin.flush() 1962 except IOError: 1963 # worker must have died? 1964 pass 1965 for mc in self.rq.fakeworker: 1966 try: 1967 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") 1968 self.rq.fakeworker[mc].process.stdin.flush() 1969 except IOError: 1970 # worker must have died? 1971 pass 1972 1973 if self.failed_tids: 1974 self.rq.state = runQueueFailed 1975 return 1976 1977 self.rq.state = runQueueComplete 1978 return 1979 1980 def finish(self): 1981 self.rq.state = runQueueCleanUp 1982 1983 active = self.stats.active + len(self.sq_live) 1984 if active > 0: 1985 bb.event.fire(runQueueExitWait(active), self.cfgData) 1986 self.rq.read_workers() 1987 return self.rq.active_fds() 1988 1989 if self.failed_tids: 1990 self.rq.state = runQueueFailed 1991 return True 1992 1993 self.rq.state = runQueueComplete 1994 return True 1995 1996 # Used by setscene only 1997 def check_dependencies(self, task, taskdeps): 1998 if not self.rq.depvalidate: 1999 return False 2000 2001 # Must not edit parent data 2002 taskdeps = set(taskdeps) 2003 2004 taskdata = {} 2005 taskdeps.add(task) 2006 for dep in taskdeps: 2007 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 2008 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2009 taskdata[dep] = [pn, taskname, fn] 2010 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 2011 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 2012 valid = bb.utils.better_eval(call, locs) 2013 return valid 2014 2015 def can_start_task(self): 2016 active = self.stats.active + len(self.sq_live) 2017 can_start = active < self.number_tasks 2018 return can_start 2019 2020 def get_schedulers(self): 2021 schedulers = set(obj for obj in globals().values() 2022 if type(obj) is type and 2023 issubclass(obj, RunQueueScheduler)) 2024 2025 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 2026 if user_schedulers: 2027 for sched in user_schedulers.split(): 2028 if not "." in sched: 2029 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 2030 continue 2031 2032 modname, name = sched.rsplit(".", 1) 2033 try: 2034 module = __import__(modname, fromlist=(name,)) 2035 except ImportError as exc: 2036 bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 2037 else: 2038 schedulers.add(getattr(module, name)) 2039 return schedulers 2040 2041 def setbuildable(self, task): 2042 self.runq_buildable.add(task) 2043 self.sched.newbuildable(task) 2044 2045 def task_completeoutright(self, task): 2046 """ 2047 Mark a task as completed 2048 Look at the reverse dependencies and mark any task with 2049 completed dependencies as buildable 2050 """ 2051 self.runq_complete.add(task) 2052 for revdep in self.rqdata.runtaskentries[task].revdeps: 2053 if revdep in self.runq_running: 2054 continue 2055 if revdep in self.runq_buildable: 2056 continue 2057 alldeps = True 2058 for dep in self.rqdata.runtaskentries[revdep].depends: 2059 if dep not in self.runq_complete: 2060 alldeps = False 2061 break 2062 if alldeps: 2063 self.setbuildable(revdep) 2064 logger.debug("Marking task %s as buildable", revdep) 2065 2066 found = None 2067 for t in sorted(self.sq_deferred.copy()): 2068 if self.sq_deferred[t] == task: 2069 # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. 2070 # We shouldn't allow all to run at once as it is prone to races. 2071 if not found: 2072 bb.debug(1, "Deferred task %s now buildable" % t) 2073 del self.sq_deferred[t] 2074 update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2075 found = t 2076 else: 2077 bb.debug(1, "Deferring %s after %s" % (t, found)) 2078 self.sq_deferred[t] = found 2079 2080 def task_complete(self, task): 2081 self.stats.taskCompleted() 2082 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2083 self.task_completeoutright(task) 2084 self.runq_tasksrun.add(task) 2085 2086 def task_fail(self, task, exitcode, fakerootlog=None): 2087 """ 2088 Called when a task has failed 2089 Updates the state engine with the failure 2090 """ 2091 self.stats.taskFailed() 2092 self.failed_tids.append(task) 2093 2094 fakeroot_log = [] 2095 if fakerootlog and os.path.exists(fakerootlog): 2096 with open(fakerootlog) as fakeroot_log_file: 2097 fakeroot_failed = False 2098 for line in reversed(fakeroot_log_file.readlines()): 2099 for fakeroot_error in ['mismatch', 'error', 'fatal']: 2100 if fakeroot_error in line.lower(): 2101 fakeroot_failed = True 2102 if 'doing new pid setup and server start' in line: 2103 break 2104 fakeroot_log.append(line) 2105 2106 if not fakeroot_failed: 2107 fakeroot_log = [] 2108 2109 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) 2110 2111 if self.rqdata.taskData[''].halt: 2112 self.rq.state = runQueueCleanUp 2113 2114 def task_skip(self, task, reason): 2115 self.runq_running.add(task) 2116 self.setbuildable(task) 2117 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 2118 self.task_completeoutright(task) 2119 self.stats.taskSkipped() 2120 self.stats.taskCompleted() 2121 2122 def summarise_scenequeue_errors(self): 2123 err = False 2124 if not self.sqdone: 2125 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2126 completeevent = sceneQueueComplete(self.stats, self.rq) 2127 bb.event.fire(completeevent, self.cfgData) 2128 if self.sq_deferred: 2129 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 2130 err = True 2131 if self.updated_taskhash_queue: 2132 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 2133 err = True 2134 if self.holdoff_tasks: 2135 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 2136 err = True 2137 2138 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 2139 # No task should end up in both covered and uncovered, that is a bug. 2140 logger.error("Setscene task %s in both covered and notcovered." % tid) 2141 2142 for tid in self.rqdata.runq_setscene_tids: 2143 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2144 err = True 2145 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 2146 if tid not in self.sq_buildable: 2147 err = True 2148 logger.error("Setscene Task %s was never marked as buildable" % tid) 2149 if tid not in self.sq_running: 2150 err = True 2151 logger.error("Setscene Task %s was never marked as running" % tid) 2152 2153 for x in self.rqdata.runtaskentries: 2154 if x not in self.tasks_covered and x not in self.tasks_notcovered: 2155 logger.error("Task %s was never moved from the setscene queue" % x) 2156 err = True 2157 if x not in self.tasks_scenequeue_done: 2158 logger.error("Task %s was never processed by the setscene code" % x) 2159 err = True 2160 if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: 2161 logger.error("Task %s was never marked as buildable by the setscene code" % x) 2162 err = True 2163 return err 2164 2165 2166 def execute(self): 2167 """ 2168 Run the tasks in a queue prepared by prepare_runqueue 2169 """ 2170 2171 self.rq.read_workers() 2172 if self.updated_taskhash_queue or self.pending_migrations: 2173 self.process_possible_migrations() 2174 2175 if not hasattr(self, "sorted_setscene_tids"): 2176 # Don't want to sort this set every execution 2177 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 2178 2179 task = None 2180 if not self.sqdone and self.can_start_task(): 2181 # Find the next setscene to run 2182 for nexttask in self.sorted_setscene_tids: 2183 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred: 2184 if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \ 2185 nexttask not in self.sq_needed_harddeps and \ 2186 self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \ 2187 self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2188 if nexttask not in self.rqdata.target_tids: 2189 logger.debug2("Skipping setscene for task %s" % nexttask) 2190 self.sq_task_skip(nexttask) 2191 self.scenequeue_notneeded.add(nexttask) 2192 if nexttask in self.sq_deferred: 2193 del self.sq_deferred[nexttask] 2194 return True 2195 if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2196 logger.debug2("Deferring %s due to hard dependencies" % nexttask) 2197 updated = False 2198 for dep in self.sqdata.sq_harddeps_rev[nexttask]: 2199 if dep not in self.sq_needed_harddeps: 2200 logger.debug2("Enabling task %s as it is a hard dependency" % dep) 2201 self.sq_buildable.add(dep) 2202 self.sq_needed_harddeps.add(dep) 2203 updated = True 2204 self.sq_harddep_deferred.add(nexttask) 2205 if updated: 2206 return True 2207 continue 2208 # If covered tasks are running, need to wait for them to complete 2209 for t in self.sqdata.sq_covered_tasks[nexttask]: 2210 if t in self.runq_running and t not in self.runq_complete: 2211 continue 2212 if nexttask in self.sq_deferred: 2213 if self.sq_deferred[nexttask] not in self.runq_complete: 2214 continue 2215 logger.debug("Task %s no longer deferred" % nexttask) 2216 del self.sq_deferred[nexttask] 2217 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2218 if not valid: 2219 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2220 self.sq_task_failoutright(nexttask) 2221 return True 2222 if nexttask in self.sqdata.outrightfail: 2223 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2224 self.sq_task_failoutright(nexttask) 2225 return True 2226 if nexttask in self.sqdata.unskippable: 2227 logger.debug2("Setscene task %s is unskippable" % nexttask) 2228 task = nexttask 2229 break 2230 if task is not None: 2231 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2232 taskname = taskname + "_setscene" 2233 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2234 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2235 self.sq_task_failoutright(task) 2236 return True 2237 2238 if self.cooker.configuration.force: 2239 if task in self.rqdata.target_tids: 2240 self.sq_task_failoutright(task) 2241 return True 2242 2243 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2244 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2245 self.sq_task_skip(task) 2246 return True 2247 2248 if self.cooker.configuration.skipsetscene: 2249 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2250 self.sq_task_failoutright(task) 2251 return True 2252 2253 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 2254 bb.event.fire(startevent, self.cfgData) 2255 2256 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2257 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2258 runtask = { 2259 'fn' : taskfn, 2260 'task' : task, 2261 'taskname' : taskname, 2262 'taskhash' : self.rqdata.get_task_hash(task), 2263 'unihash' : self.rqdata.get_task_unihash(task), 2264 'quieterrors' : True, 2265 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2266 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2267 'taskdepdata' : self.sq_build_taskdepdata(task), 2268 'dry_run' : False, 2269 'taskdep': taskdep, 2270 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2271 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2272 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2273 } 2274 2275 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2276 if not mc in self.rq.fakeworker: 2277 self.rq.start_fakeworker(self, mc) 2278 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") 2279 self.rq.fakeworker[mc].process.stdin.flush() 2280 else: 2281 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") 2282 self.rq.worker[mc].process.stdin.flush() 2283 2284 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2285 self.build_stamps2.append(self.build_stamps[task]) 2286 self.sq_running.add(task) 2287 self.sq_live.add(task) 2288 self.stats.updateActiveSetscene(len(self.sq_live)) 2289 if self.can_start_task(): 2290 return True 2291 2292 self.update_holdofftasks() 2293 2294 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2295 hashequiv_logger.verbose("Setscene tasks completed") 2296 2297 err = self.summarise_scenequeue_errors() 2298 if err: 2299 self.rq.state = runQueueFailed 2300 return True 2301 2302 if self.cooker.configuration.setsceneonly: 2303 self.rq.state = runQueueComplete 2304 return True 2305 self.sqdone = True 2306 2307 if self.stats.total == 0: 2308 # nothing to do 2309 self.rq.state = runQueueComplete 2310 return True 2311 2312 if self.cooker.configuration.setsceneonly: 2313 task = None 2314 else: 2315 task = self.sched.next() 2316 if task is not None: 2317 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2318 2319 if self.rqdata.setscene_ignore_tasks is not None: 2320 if self.check_setscene_ignore_tasks(task): 2321 self.task_fail(task, "setscene ignore_tasks") 2322 return True 2323 2324 if task in self.tasks_covered: 2325 logger.debug2("Setscene covered task %s", task) 2326 self.task_skip(task, "covered") 2327 return True 2328 2329 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2330 logger.debug2("Stamp current task %s", task) 2331 2332 self.task_skip(task, "existing") 2333 self.runq_tasksrun.add(task) 2334 return True 2335 2336 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2337 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2338 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2339 noexec=True) 2340 bb.event.fire(startevent, self.cfgData) 2341 self.runq_running.add(task) 2342 self.stats.taskActive() 2343 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2344 bb.build.make_stamp_mcfn(taskname, taskfn) 2345 self.task_complete(task) 2346 return True 2347 else: 2348 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2349 bb.event.fire(startevent, self.cfgData) 2350 2351 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2352 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2353 runtask = { 2354 'fn' : taskfn, 2355 'task' : task, 2356 'taskname' : taskname, 2357 'taskhash' : self.rqdata.get_task_hash(task), 2358 'unihash' : self.rqdata.get_task_unihash(task), 2359 'quieterrors' : False, 2360 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2361 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2362 'taskdepdata' : self.build_taskdepdata(task), 2363 'dry_run' : self.rqdata.setscene_enforce, 2364 'taskdep': taskdep, 2365 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2366 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2367 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2368 } 2369 2370 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2371 if not mc in self.rq.fakeworker: 2372 try: 2373 self.rq.start_fakeworker(self, mc) 2374 except OSError as exc: 2375 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2376 self.rq.state = runQueueFailed 2377 self.stats.taskFailed() 2378 return True 2379 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") 2380 self.rq.fakeworker[mc].process.stdin.flush() 2381 else: 2382 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") 2383 self.rq.worker[mc].process.stdin.flush() 2384 2385 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2386 self.build_stamps2.append(self.build_stamps[task]) 2387 self.runq_running.add(task) 2388 self.stats.taskActive() 2389 if self.can_start_task(): 2390 return True 2391 2392 if self.stats.active > 0 or self.sq_live: 2393 self.rq.read_workers() 2394 return self.rq.active_fds() 2395 2396 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2397 if self.sq_deferred: 2398 deferred_tid = list(self.sq_deferred.keys())[0] 2399 blocking_tid = self.sq_deferred.pop(deferred_tid) 2400 logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid)) 2401 return True 2402 2403 if self.failed_tids: 2404 self.rq.state = runQueueFailed 2405 return True 2406 2407 # Sanity Checks 2408 err = self.summarise_scenequeue_errors() 2409 for task in self.rqdata.runtaskentries: 2410 if task not in self.runq_buildable: 2411 logger.error("Task %s never buildable!", task) 2412 err = True 2413 elif task not in self.runq_running: 2414 logger.error("Task %s never ran!", task) 2415 err = True 2416 elif task not in self.runq_complete: 2417 logger.error("Task %s never completed!", task) 2418 err = True 2419 2420 if err: 2421 self.rq.state = runQueueFailed 2422 else: 2423 self.rq.state = runQueueComplete 2424 2425 return True 2426 2427 def filtermcdeps(self, task, mc, deps): 2428 ret = set() 2429 for dep in deps: 2430 thismc = mc_from_tid(dep) 2431 if thismc != mc: 2432 continue 2433 ret.add(dep) 2434 return ret 2435 2436 # Build the individual cache entries in advance once to save time 2437 def build_taskdepdata_cache(self): 2438 taskdepdata_cache = {} 2439 for task in self.rqdata.runtaskentries: 2440 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2441 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2442 deps = self.rqdata.runtaskentries[task].depends 2443 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2444 taskhash = self.rqdata.runtaskentries[task].hash 2445 unihash = self.rqdata.runtaskentries[task].unihash 2446 deps = self.filtermcdeps(task, mc, deps) 2447 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] 2448 taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] 2449 2450 self.taskdepdata_cache = taskdepdata_cache 2451 2452 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2453 # as most code can't handle them 2454 def build_taskdepdata(self, task): 2455 taskdepdata = {} 2456 mc = mc_from_tid(task) 2457 next = self.rqdata.runtaskentries[task].depends.copy() 2458 next.add(task) 2459 next = self.filtermcdeps(task, mc, next) 2460 while next: 2461 additional = [] 2462 for revdep in next: 2463 self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash 2464 taskdepdata[revdep] = self.taskdepdata_cache[revdep] 2465 for revdep2 in self.taskdepdata_cache[revdep][3]: 2466 if revdep2 not in taskdepdata: 2467 additional.append(revdep2) 2468 next = additional 2469 2470 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2471 return taskdepdata 2472 2473 def update_holdofftasks(self): 2474 2475 if not self.holdoff_need_update: 2476 return 2477 2478 notcovered = set(self.scenequeue_notcovered) 2479 notcovered |= self.sqdata.cantskip 2480 for tid in self.scenequeue_notcovered: 2481 notcovered |= self.sqdata.sq_covered_tasks[tid] 2482 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2483 notcovered.intersection_update(self.tasks_scenequeue_done) 2484 2485 covered = set(self.scenequeue_covered) 2486 for tid in self.scenequeue_covered: 2487 covered |= self.sqdata.sq_covered_tasks[tid] 2488 covered.difference_update(notcovered) 2489 covered.intersection_update(self.tasks_scenequeue_done) 2490 2491 for tid in notcovered | covered: 2492 if not self.rqdata.runtaskentries[tid].depends: 2493 self.setbuildable(tid) 2494 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2495 self.setbuildable(tid) 2496 2497 self.tasks_covered = covered 2498 self.tasks_notcovered = notcovered 2499 2500 self.holdoff_tasks = set() 2501 2502 for tid in self.rqdata.runq_setscene_tids: 2503 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2504 self.holdoff_tasks.add(tid) 2505 2506 for tid in self.holdoff_tasks.copy(): 2507 for dep in self.sqdata.sq_covered_tasks[tid]: 2508 if dep not in self.runq_complete: 2509 self.holdoff_tasks.add(dep) 2510 2511 self.holdoff_need_update = False 2512 2513 def process_possible_migrations(self): 2514 2515 changed = set() 2516 toprocess = set() 2517 for tid, unihash in self.updated_taskhash_queue.copy(): 2518 if tid in self.runq_running and tid not in self.runq_complete: 2519 continue 2520 2521 self.updated_taskhash_queue.remove((tid, unihash)) 2522 2523 if unihash != self.rqdata.runtaskentries[tid].unihash: 2524 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2525 torehash = [tid] 2526 for deftid in self.sq_deferred: 2527 if self.sq_deferred[deftid] == tid: 2528 torehash.append(deftid) 2529 for hashtid in torehash: 2530 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2531 self.rqdata.runtaskentries[hashtid].unihash = unihash 2532 bb.parse.siggen.set_unihash(hashtid, unihash) 2533 toprocess.add(hashtid) 2534 if torehash: 2535 # Need to save after set_unihash above 2536 bb.parse.siggen.save_unitaskhashes() 2537 2538 # Work out all tasks which depend upon these 2539 total = set() 2540 next = set() 2541 for p in toprocess: 2542 next |= self.rqdata.runtaskentries[p].revdeps 2543 while next: 2544 current = next.copy() 2545 total = total | next 2546 next = set() 2547 for ntid in current: 2548 next |= self.rqdata.runtaskentries[ntid].revdeps 2549 next.difference_update(total) 2550 2551 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2552 next = set() 2553 for p in total: 2554 if not self.rqdata.runtaskentries[p].depends: 2555 next.add(p) 2556 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2557 next.add(p) 2558 2559 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2560 while next: 2561 current = next.copy() 2562 next = set() 2563 for tid in current: 2564 if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2565 continue 2566 orighash = self.rqdata.runtaskentries[tid].hash 2567 newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) 2568 origuni = self.rqdata.runtaskentries[tid].unihash 2569 newuni = bb.parse.siggen.get_unihash(tid) 2570 # FIXME, need to check it can come from sstate at all for determinism? 2571 remapped = False 2572 if newuni == origuni: 2573 # Nothing to do, we match, skip code below 2574 remapped = True 2575 elif tid in self.scenequeue_covered or tid in self.sq_live: 2576 # Already ran this setscene task or it running. Report the new taskhash 2577 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2578 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2579 remapped = True 2580 2581 if not remapped: 2582 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2583 self.rqdata.runtaskentries[tid].hash = newhash 2584 self.rqdata.runtaskentries[tid].unihash = newuni 2585 changed.add(tid) 2586 2587 next |= self.rqdata.runtaskentries[tid].revdeps 2588 total.remove(tid) 2589 next.intersection_update(total) 2590 2591 if changed: 2592 for mc in self.rq.worker: 2593 RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") 2594 for mc in self.rq.fakeworker: 2595 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") 2596 2597 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2598 2599 for tid in changed: 2600 if tid not in self.rqdata.runq_setscene_tids: 2601 continue 2602 if tid not in self.pending_migrations: 2603 self.pending_migrations.add(tid) 2604 2605 update_tasks = [] 2606 for tid in self.pending_migrations.copy(): 2607 if tid in self.runq_running or tid in self.sq_live: 2608 # Too late, task already running, not much we can do now 2609 self.pending_migrations.remove(tid) 2610 continue 2611 2612 valid = True 2613 # Check no tasks this covers are running 2614 for dep in self.sqdata.sq_covered_tasks[tid]: 2615 if dep in self.runq_running and dep not in self.runq_complete: 2616 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2617 valid = False 2618 break 2619 if not valid: 2620 continue 2621 2622 self.pending_migrations.remove(tid) 2623 changed = True 2624 2625 if tid in self.tasks_scenequeue_done: 2626 self.tasks_scenequeue_done.remove(tid) 2627 for dep in self.sqdata.sq_covered_tasks[tid]: 2628 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2629 bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) 2630 self.failed_tids.append(tid) 2631 self.rq.state = runQueueCleanUp 2632 return 2633 2634 if dep not in self.runq_complete: 2635 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2636 self.tasks_scenequeue_done.remove(dep) 2637 2638 if tid in self.sq_buildable: 2639 self.sq_buildable.remove(tid) 2640 if tid in self.sq_running: 2641 self.sq_running.remove(tid) 2642 if tid in self.sqdata.outrightfail: 2643 self.sqdata.outrightfail.remove(tid) 2644 if tid in self.scenequeue_notcovered: 2645 self.scenequeue_notcovered.remove(tid) 2646 if tid in self.scenequeue_covered: 2647 self.scenequeue_covered.remove(tid) 2648 if tid in self.scenequeue_notneeded: 2649 self.scenequeue_notneeded.remove(tid) 2650 2651 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2652 self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2653 2654 if tid in self.stampcache: 2655 del self.stampcache[tid] 2656 2657 if tid in self.build_stamps: 2658 del self.build_stamps[tid] 2659 2660 update_tasks.append(tid) 2661 2662 update_tasks2 = [] 2663 for tid in update_tasks: 2664 harddepfail = False 2665 for t in self.sqdata.sq_harddeps_rev[tid]: 2666 if t in self.scenequeue_notcovered: 2667 harddepfail = True 2668 break 2669 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2670 if tid not in self.sq_buildable: 2671 self.sq_buildable.add(tid) 2672 if not self.sqdata.sq_revdeps[tid]: 2673 self.sq_buildable.add(tid) 2674 2675 update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid)) 2676 2677 if update_tasks2: 2678 self.sqdone = False 2679 for mc in sorted(self.sqdata.multiconfigs): 2680 for tid in sorted([t[0] for t in update_tasks2]): 2681 if mc_from_tid(tid) != mc: 2682 continue 2683 h = pending_hash_index(tid, self.rqdata) 2684 if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: 2685 self.sq_deferred[tid] = self.sqdata.hashes[h] 2686 bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) 2687 update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2688 2689 for (tid, harddepfail, origvalid) in update_tasks2: 2690 if tid in self.sqdata.valid and not origvalid: 2691 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2692 if harddepfail: 2693 logger.debug2("%s has an unavailable hard dependency so skipping" % (tid)) 2694 self.sq_task_failoutright(tid) 2695 2696 if changed: 2697 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2698 self.sq_needed_harddeps = set() 2699 self.sq_harddep_deferred = set() 2700 self.holdoff_need_update = True 2701 2702 def scenequeue_updatecounters(self, task, fail=False): 2703 2704 if fail and task in self.sqdata.sq_harddeps: 2705 for dep in sorted(self.sqdata.sq_harddeps[task]): 2706 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2707 # dependency could be already processed, e.g. noexec setscene task 2708 continue 2709 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2710 if noexec or stamppresent: 2711 continue 2712 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2713 self.sq_task_failoutright(dep) 2714 continue 2715 for dep in sorted(self.sqdata.sq_deps[task]): 2716 if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2717 if dep not in self.sq_buildable: 2718 self.sq_buildable.add(dep) 2719 2720 next = set([task]) 2721 while next: 2722 new = set() 2723 for t in sorted(next): 2724 self.tasks_scenequeue_done.add(t) 2725 # Look down the dependency chain for non-setscene things which this task depends on 2726 # and mark as 'done' 2727 for dep in self.rqdata.runtaskentries[t].depends: 2728 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2729 continue 2730 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2731 new.add(dep) 2732 next = new 2733 2734 # If this task was one which other setscene tasks have a hard dependency upon, we need 2735 # to walk through the hard dependencies and allow execution of those which have completed dependencies. 2736 if task in self.sqdata.sq_harddeps: 2737 for dep in self.sq_harddep_deferred.copy(): 2738 if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2739 self.sq_harddep_deferred.remove(dep) 2740 2741 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2742 self.holdoff_need_update = True 2743 2744 def sq_task_completeoutright(self, task): 2745 """ 2746 Mark a task as completed 2747 Look at the reverse dependencies and mark any task with 2748 completed dependencies as buildable 2749 """ 2750 2751 logger.debug('Found task %s which could be accelerated', task) 2752 self.scenequeue_covered.add(task) 2753 self.scenequeue_updatecounters(task) 2754 2755 def sq_check_taskfail(self, task): 2756 if self.rqdata.setscene_ignore_tasks is not None: 2757 realtask = task.split('_setscene')[0] 2758 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2759 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2760 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2761 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2762 self.rq.state = runQueueCleanUp 2763 2764 def sq_task_complete(self, task): 2765 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2766 self.sq_task_completeoutright(task) 2767 2768 def sq_task_fail(self, task, result): 2769 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) 2770 self.scenequeue_notcovered.add(task) 2771 self.scenequeue_updatecounters(task, True) 2772 self.sq_check_taskfail(task) 2773 2774 def sq_task_failoutright(self, task): 2775 self.sq_running.add(task) 2776 self.sq_buildable.add(task) 2777 self.scenequeue_notcovered.add(task) 2778 self.scenequeue_updatecounters(task, True) 2779 2780 def sq_task_skip(self, task): 2781 self.sq_running.add(task) 2782 self.sq_buildable.add(task) 2783 self.sq_task_completeoutright(task) 2784 2785 def sq_build_taskdepdata(self, task): 2786 def getsetscenedeps(tid): 2787 deps = set() 2788 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2789 realtid = tid + "_setscene" 2790 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2791 for (depname, idependtask) in idepends: 2792 if depname not in self.rqdata.taskData[mc].build_targets: 2793 continue 2794 2795 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2796 if depfn is None: 2797 continue 2798 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2799 deps.add(deptid) 2800 return deps 2801 2802 taskdepdata = {} 2803 next = getsetscenedeps(task) 2804 next.add(task) 2805 while next: 2806 additional = [] 2807 for revdep in next: 2808 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2809 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2810 deps = getsetscenedeps(revdep) 2811 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] 2812 taskhash = self.rqdata.runtaskentries[revdep].hash 2813 unihash = self.rqdata.runtaskentries[revdep].unihash 2814 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] 2815 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] 2816 for revdep2 in deps: 2817 if revdep2 not in taskdepdata: 2818 additional.append(revdep2) 2819 next = additional 2820 2821 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2822 return taskdepdata 2823 2824 def check_setscene_ignore_tasks(self, tid): 2825 # Check task that is going to run against the ignore tasks list 2826 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2827 # Ignore covered tasks 2828 if tid in self.tasks_covered: 2829 return False 2830 # Ignore stamped tasks 2831 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2832 return False 2833 # Ignore noexec tasks 2834 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2835 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2836 return False 2837 2838 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2839 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2840 if tid in self.rqdata.runq_setscene_tids: 2841 msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] 2842 else: 2843 msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] 2844 for t in self.scenequeue_notcovered: 2845 msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) 2846 msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2847 logger.error("".join(msg)) 2848 return True 2849 return False 2850 2851class SQData(object): 2852 def __init__(self): 2853 # SceneQueue dependencies 2854 self.sq_deps = {} 2855 # SceneQueue reverse dependencies 2856 self.sq_revdeps = {} 2857 # Injected inter-setscene task dependencies 2858 self.sq_harddeps = {} 2859 self.sq_harddeps_rev = {} 2860 # Cache of stamp files so duplicates can't run in parallel 2861 self.stamps = {} 2862 # Setscene tasks directly depended upon by the build 2863 self.unskippable = set() 2864 # List of setscene tasks which aren't present 2865 self.outrightfail = set() 2866 # A list of normal tasks a setscene task covers 2867 self.sq_covered_tasks = {} 2868 2869def build_scenequeue_data(sqdata, rqdata, sqrq): 2870 2871 sq_revdeps = {} 2872 sq_revdeps_squash = {} 2873 sq_collated_deps = {} 2874 2875 # We can't skip specified target tasks which aren't setscene tasks 2876 sqdata.cantskip = set(rqdata.target_tids) 2877 sqdata.cantskip.difference_update(rqdata.runq_setscene_tids) 2878 sqdata.cantskip.intersection_update(rqdata.runtaskentries) 2879 2880 # We need to construct a dependency graph for the setscene functions. Intermediate 2881 # dependencies between the setscene tasks only complicate the code. This code 2882 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2883 # only containing the setscene functions. 2884 2885 rqdata.init_progress_reporter.next_stage() 2886 2887 # First process the chains up to the first setscene task. 2888 endpoints = {} 2889 for tid in rqdata.runtaskentries: 2890 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2891 sq_revdeps_squash[tid] = set() 2892 if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: 2893 #bb.warn("Added endpoint %s" % (tid)) 2894 endpoints[tid] = set() 2895 2896 rqdata.init_progress_reporter.next_stage() 2897 2898 # Secondly process the chains between setscene tasks. 2899 for tid in rqdata.runq_setscene_tids: 2900 sq_collated_deps[tid] = set() 2901 #bb.warn("Added endpoint 2 %s" % (tid)) 2902 for dep in rqdata.runtaskentries[tid].depends: 2903 if tid in sq_revdeps[dep]: 2904 sq_revdeps[dep].remove(tid) 2905 if dep not in endpoints: 2906 endpoints[dep] = set() 2907 #bb.warn(" Added endpoint 3 %s" % (dep)) 2908 endpoints[dep].add(tid) 2909 2910 rqdata.init_progress_reporter.next_stage() 2911 2912 def process_endpoints(endpoints): 2913 newendpoints = {} 2914 for point, task in endpoints.items(): 2915 tasks = set() 2916 if task: 2917 tasks |= task 2918 if sq_revdeps_squash[point]: 2919 tasks |= sq_revdeps_squash[point] 2920 if point not in rqdata.runq_setscene_tids: 2921 for t in tasks: 2922 sq_collated_deps[t].add(point) 2923 sq_revdeps_squash[point] = set() 2924 if point in rqdata.runq_setscene_tids: 2925 sq_revdeps_squash[point] = tasks 2926 continue 2927 for dep in rqdata.runtaskentries[point].depends: 2928 if point in sq_revdeps[dep]: 2929 sq_revdeps[dep].remove(point) 2930 if tasks: 2931 sq_revdeps_squash[dep] |= tasks 2932 if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: 2933 newendpoints[dep] = task 2934 if newendpoints: 2935 process_endpoints(newendpoints) 2936 2937 process_endpoints(endpoints) 2938 2939 rqdata.init_progress_reporter.next_stage() 2940 2941 # Build a list of tasks which are "unskippable" 2942 # These are direct endpoints referenced by the build upto and including setscene tasks 2943 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2944 new = True 2945 for tid in rqdata.runtaskentries: 2946 if not rqdata.runtaskentries[tid].revdeps: 2947 sqdata.unskippable.add(tid) 2948 sqdata.unskippable |= sqdata.cantskip 2949 while new: 2950 new = False 2951 orig = sqdata.unskippable.copy() 2952 for tid in sorted(orig, reverse=True): 2953 if tid in rqdata.runq_setscene_tids: 2954 continue 2955 if not rqdata.runtaskentries[tid].depends: 2956 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2957 sqrq.setbuildable(tid) 2958 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2959 if sqdata.unskippable != orig: 2960 new = True 2961 2962 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 2963 2964 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2965 2966 # Sanity check all dependencies could be changed to setscene task references 2967 for taskcounter, tid in enumerate(rqdata.runtaskentries): 2968 if tid in rqdata.runq_setscene_tids: 2969 pass 2970 elif sq_revdeps_squash[tid]: 2971 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") 2972 else: 2973 del sq_revdeps_squash[tid] 2974 rqdata.init_progress_reporter.update(taskcounter) 2975 2976 rqdata.init_progress_reporter.next_stage() 2977 2978 # Resolve setscene inter-task dependencies 2979 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 2980 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 2981 for tid in rqdata.runq_setscene_tids: 2982 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2983 realtid = tid + "_setscene" 2984 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 2985 sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2986 2987 sqdata.sq_harddeps_rev[tid] = set() 2988 for (depname, idependtask) in idepends: 2989 2990 if depname not in rqdata.taskData[mc].build_targets: 2991 continue 2992 2993 depfn = rqdata.taskData[mc].build_targets[depname][0] 2994 if depfn is None: 2995 continue 2996 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2997 if deptid not in rqdata.runtaskentries: 2998 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 2999 3000 logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid)) 3001 3002 if not deptid in sqdata.sq_harddeps: 3003 sqdata.sq_harddeps[deptid] = set() 3004 sqdata.sq_harddeps[deptid].add(tid) 3005 sqdata.sq_harddeps_rev[tid].add(deptid) 3006 3007 rqdata.init_progress_reporter.next_stage() 3008 3009 rqdata.init_progress_reporter.next_stage() 3010 3011 #for tid in sq_revdeps_squash: 3012 # data = "" 3013 # for dep in sq_revdeps_squash[tid]: 3014 # data = data + "\n %s" % dep 3015 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 3016 3017 sqdata.sq_revdeps = sq_revdeps_squash 3018 sqdata.sq_covered_tasks = sq_collated_deps 3019 3020 # Build reverse version of revdeps to populate deps structure 3021 for tid in sqdata.sq_revdeps: 3022 sqdata.sq_deps[tid] = set() 3023 for tid in sqdata.sq_revdeps: 3024 for dep in sqdata.sq_revdeps[tid]: 3025 sqdata.sq_deps[dep].add(tid) 3026 3027 rqdata.init_progress_reporter.next_stage() 3028 3029 sqdata.multiconfigs = set() 3030 for tid in sqdata.sq_revdeps: 3031 sqdata.multiconfigs.add(mc_from_tid(tid)) 3032 if not sqdata.sq_revdeps[tid]: 3033 sqrq.sq_buildable.add(tid) 3034 3035 rqdata.init_progress_reporter.next_stage() 3036 3037 sqdata.noexec = set() 3038 sqdata.stamppresent = set() 3039 sqdata.valid = set() 3040 3041 sqdata.hashes = {} 3042 sqrq.sq_deferred = {} 3043 for mc in sorted(sqdata.multiconfigs): 3044 for tid in sorted(sqdata.sq_revdeps): 3045 if mc_from_tid(tid) != mc: 3046 continue 3047 h = pending_hash_index(tid, rqdata) 3048 if h not in sqdata.hashes: 3049 sqdata.hashes[h] = tid 3050 else: 3051 sqrq.sq_deferred[tid] = sqdata.hashes[h] 3052 bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) 3053 3054def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 3055 3056 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 3057 3058 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 3059 3060 if 'noexec' in taskdep and taskname in taskdep['noexec']: 3061 bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn) 3062 return True, False 3063 3064 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 3065 logger.debug2('Setscene stamp current for task %s', tid) 3066 return False, True 3067 3068 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 3069 logger.debug2('Normal stamp current for task %s', tid) 3070 return False, True 3071 3072 return False, False 3073 3074def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 3075 3076 tocheck = set() 3077 3078 for tid in sorted(tids): 3079 if tid in sqdata.stamppresent: 3080 sqdata.stamppresent.remove(tid) 3081 if tid in sqdata.valid: 3082 sqdata.valid.remove(tid) 3083 if tid in sqdata.outrightfail: 3084 sqdata.outrightfail.remove(tid) 3085 3086 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 3087 3088 if noexec: 3089 sqdata.noexec.add(tid) 3090 sqrq.sq_task_skip(tid) 3091 logger.debug2("%s is noexec so skipping setscene" % (tid)) 3092 continue 3093 3094 if stamppresent: 3095 sqdata.stamppresent.add(tid) 3096 sqrq.sq_task_skip(tid) 3097 logger.debug2("%s has a valid stamp, skipping" % (tid)) 3098 continue 3099 3100 tocheck.add(tid) 3101 3102 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 3103 3104 for tid in tids: 3105 if tid in sqdata.stamppresent: 3106 continue 3107 if tid in sqdata.valid: 3108 continue 3109 if tid in sqdata.noexec: 3110 continue 3111 if tid in sqrq.scenequeue_covered: 3112 continue 3113 if tid in sqrq.scenequeue_notcovered: 3114 continue 3115 if tid in sqrq.sq_deferred: 3116 continue 3117 sqdata.outrightfail.add(tid) 3118 logger.debug2("%s already handled (fallthrough), skipping" % (tid)) 3119 3120class TaskFailure(Exception): 3121 """ 3122 Exception raised when a task in a runqueue fails 3123 """ 3124 def __init__(self, x): 3125 self.args = x 3126 3127 3128class runQueueExitWait(bb.event.Event): 3129 """ 3130 Event when waiting for task processes to exit 3131 """ 3132 3133 def __init__(self, remain): 3134 self.remain = remain 3135 self.message = "Waiting for %s active tasks to finish" % remain 3136 bb.event.Event.__init__(self) 3137 3138class runQueueEvent(bb.event.Event): 3139 """ 3140 Base runQueue event class 3141 """ 3142 def __init__(self, task, stats, rq): 3143 self.taskid = task 3144 self.taskstring = task 3145 self.taskname = taskname_from_tid(task) 3146 self.taskfile = fn_from_tid(task) 3147 self.taskhash = rq.rqdata.get_task_hash(task) 3148 self.stats = stats.copy() 3149 bb.event.Event.__init__(self) 3150 3151class sceneQueueEvent(runQueueEvent): 3152 """ 3153 Base sceneQueue event class 3154 """ 3155 def __init__(self, task, stats, rq, noexec=False): 3156 runQueueEvent.__init__(self, task, stats, rq) 3157 self.taskstring = task + "_setscene" 3158 self.taskname = taskname_from_tid(task) + "_setscene" 3159 self.taskfile = fn_from_tid(task) 3160 self.taskhash = rq.rqdata.get_task_hash(task) 3161 3162class runQueueTaskStarted(runQueueEvent): 3163 """ 3164 Event notifying a task was started 3165 """ 3166 def __init__(self, task, stats, rq, noexec=False): 3167 runQueueEvent.__init__(self, task, stats, rq) 3168 self.noexec = noexec 3169 3170class sceneQueueTaskStarted(sceneQueueEvent): 3171 """ 3172 Event notifying a setscene task was started 3173 """ 3174 def __init__(self, task, stats, rq, noexec=False): 3175 sceneQueueEvent.__init__(self, task, stats, rq) 3176 self.noexec = noexec 3177 3178class runQueueTaskFailed(runQueueEvent): 3179 """ 3180 Event notifying a task failed 3181 """ 3182 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 3183 runQueueEvent.__init__(self, task, stats, rq) 3184 self.exitcode = exitcode 3185 self.fakeroot_log = fakeroot_log 3186 3187 def __str__(self): 3188 if self.fakeroot_log: 3189 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 3190 else: 3191 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 3192 3193class sceneQueueTaskFailed(sceneQueueEvent): 3194 """ 3195 Event notifying a setscene task failed 3196 """ 3197 def __init__(self, task, stats, exitcode, rq): 3198 sceneQueueEvent.__init__(self, task, stats, rq) 3199 self.exitcode = exitcode 3200 3201 def __str__(self): 3202 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 3203 3204class sceneQueueComplete(sceneQueueEvent): 3205 """ 3206 Event when all the sceneQueue tasks are complete 3207 """ 3208 def __init__(self, stats, rq): 3209 self.stats = stats.copy() 3210 bb.event.Event.__init__(self) 3211 3212class runQueueTaskCompleted(runQueueEvent): 3213 """ 3214 Event notifying a task completed 3215 """ 3216 3217class sceneQueueTaskCompleted(sceneQueueEvent): 3218 """ 3219 Event notifying a setscene task completed 3220 """ 3221 3222class runQueueTaskSkipped(runQueueEvent): 3223 """ 3224 Event notifying a task was skipped 3225 """ 3226 def __init__(self, task, stats, rq, reason): 3227 runQueueEvent.__init__(self, task, stats, rq) 3228 self.reason = reason 3229 3230class taskUniHashUpdate(bb.event.Event): 3231 """ 3232 Base runQueue event class 3233 """ 3234 def __init__(self, task, unihash): 3235 self.taskid = task 3236 self.unihash = unihash 3237 bb.event.Event.__init__(self) 3238 3239class runQueuePipe(): 3240 """ 3241 Abstraction for a pipe between a worker thread and the server 3242 """ 3243 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3244 self.input = pipein 3245 if pipeout: 3246 pipeout.close() 3247 bb.utils.nonblockingfd(self.input) 3248 self.queue = bytearray() 3249 self.d = d 3250 self.rq = rq 3251 self.rqexec = rqexec 3252 self.fakerootlogs = fakerootlogs 3253 3254 def read(self): 3255 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3256 for worker in workers.values(): 3257 worker.process.poll() 3258 if worker.process.returncode is not None and not self.rq.teardown: 3259 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3260 self.rq.finish_runqueue(True) 3261 3262 start = len(self.queue) 3263 try: 3264 self.queue.extend(self.input.read(102400) or b"") 3265 except (OSError, IOError) as e: 3266 if e.errno != errno.EAGAIN: 3267 raise 3268 end = len(self.queue) 3269 found = True 3270 while found and self.queue: 3271 found = False 3272 index = self.queue.find(b"</event>") 3273 while index != -1 and self.queue.startswith(b"<event>"): 3274 try: 3275 event = pickle.loads(self.queue[7:index]) 3276 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3277 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3278 # The pickled data could contain "</event>" so search for the next occurance 3279 # unpickling again, this should be the only way an unpickle error could occur 3280 index = self.queue.find(b"</event>", index + 1) 3281 continue 3282 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3283 bb.event.fire_from_worker(event, self.d) 3284 if isinstance(event, taskUniHashUpdate): 3285 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3286 found = True 3287 self.queue = self.queue[index+8:] 3288 index = self.queue.find(b"</event>") 3289 index = self.queue.find(b"</exitcode>") 3290 while index != -1 and self.queue.startswith(b"<exitcode>"): 3291 try: 3292 task, status = pickle.loads(self.queue[10:index]) 3293 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3294 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3295 (_, _, _, taskfn) = split_tid_mcfn(task) 3296 fakerootlog = None 3297 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3298 fakerootlog = self.fakerootlogs[taskfn] 3299 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3300 found = True 3301 self.queue = self.queue[index+11:] 3302 index = self.queue.find(b"</exitcode>") 3303 return (end > start) 3304 3305 def close(self): 3306 while self.read(): 3307 continue 3308 if self.queue: 3309 print("Warning, worker left partial message: %s" % self.queue) 3310 self.input.close() 3311 3312def get_setscene_enforce_ignore_tasks(d, targets): 3313 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3314 return None 3315 ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() 3316 outlist = [] 3317 for item in ignore_tasks[:]: 3318 if item.startswith('%:'): 3319 for (mc, target, task, fn) in targets: 3320 outlist.append(target + ':' + item.split(':')[1]) 3321 else: 3322 outlist.append(item) 3323 return outlist 3324 3325def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): 3326 import fnmatch 3327 if ignore_tasks is not None: 3328 item = '%s:%s' % (pn, taskname) 3329 for ignore_tasks in ignore_tasks: 3330 if fnmatch.fnmatch(item, ignore_tasks): 3331 return True 3332 return False 3333 return True 3334