1# 2# Copyright (c) 2013, Intel Corporation. 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6# DESCRIPTION 7 8# This module implements the image creation engine used by 'wic' to 9# create images. The engine parses through the OpenEmbedded kickstart 10# (wks) file specified and generates images that can then be directly 11# written onto media. 12# 13# AUTHORS 14# Tom Zanussi <tom.zanussi (at] linux.intel.com> 15# 16 17import logging 18import os 19import tempfile 20import json 21import subprocess 22 23from collections import namedtuple, OrderedDict 24from distutils.spawn import find_executable 25 26from wic import WicError 27from wic.filemap import sparse_copy 28from wic.pluginbase import PluginMgr 29from wic.misc import get_bitbake_var, exec_cmd 30 31logger = logging.getLogger('wic') 32 33def verify_build_env(): 34 """ 35 Verify that the build environment is sane. 36 37 Returns True if it is, false otherwise 38 """ 39 if not os.environ.get("BUILDDIR"): 40 raise WicError("BUILDDIR not found, exiting. (Did you forget to source oe-init-build-env?)") 41 42 return True 43 44 45CANNED_IMAGE_DIR = "lib/wic/canned-wks" # relative to scripts 46SCRIPTS_CANNED_IMAGE_DIR = "scripts/" + CANNED_IMAGE_DIR 47WIC_DIR = "wic" 48 49def build_canned_image_list(path): 50 layers_path = get_bitbake_var("BBLAYERS") 51 canned_wks_layer_dirs = [] 52 53 if layers_path is not None: 54 for layer_path in layers_path.split(): 55 for wks_path in (WIC_DIR, SCRIPTS_CANNED_IMAGE_DIR): 56 cpath = os.path.join(layer_path, wks_path) 57 if os.path.isdir(cpath): 58 canned_wks_layer_dirs.append(cpath) 59 60 cpath = os.path.join(path, CANNED_IMAGE_DIR) 61 canned_wks_layer_dirs.append(cpath) 62 63 return canned_wks_layer_dirs 64 65def find_canned_image(scripts_path, wks_file): 66 """ 67 Find a .wks file with the given name in the canned files dir. 68 69 Return False if not found 70 """ 71 layers_canned_wks_dir = build_canned_image_list(scripts_path) 72 73 for canned_wks_dir in layers_canned_wks_dir: 74 for root, dirs, files in os.walk(canned_wks_dir): 75 for fname in files: 76 if fname.endswith("~") or fname.endswith("#"): 77 continue 78 if ((fname.endswith(".wks") and wks_file + ".wks" == fname) or \ 79 (fname.endswith(".wks.in") and wks_file + ".wks.in" == fname)): 80 fullpath = os.path.join(canned_wks_dir, fname) 81 return fullpath 82 return None 83 84 85def list_canned_images(scripts_path): 86 """ 87 List the .wks files in the canned image dir, minus the extension. 88 """ 89 layers_canned_wks_dir = build_canned_image_list(scripts_path) 90 91 for canned_wks_dir in layers_canned_wks_dir: 92 for root, dirs, files in os.walk(canned_wks_dir): 93 for fname in files: 94 if fname.endswith("~") or fname.endswith("#"): 95 continue 96 if fname.endswith(".wks") or fname.endswith(".wks.in"): 97 fullpath = os.path.join(canned_wks_dir, fname) 98 with open(fullpath) as wks: 99 for line in wks: 100 desc = "" 101 idx = line.find("short-description:") 102 if idx != -1: 103 desc = line[idx + len("short-description:"):].strip() 104 break 105 basename = fname.split('.')[0] 106 print(" %s\t\t%s" % (basename.ljust(30), desc)) 107 108 109def list_canned_image_help(scripts_path, fullpath): 110 """ 111 List the help and params in the specified canned image. 112 """ 113 found = False 114 with open(fullpath) as wks: 115 for line in wks: 116 if not found: 117 idx = line.find("long-description:") 118 if idx != -1: 119 print() 120 print(line[idx + len("long-description:"):].strip()) 121 found = True 122 continue 123 if not line.strip(): 124 break 125 idx = line.find("#") 126 if idx != -1: 127 print(line[idx + len("#:"):].rstrip()) 128 else: 129 break 130 131 132def list_source_plugins(): 133 """ 134 List the available source plugins i.e. plugins available for --source. 135 """ 136 plugins = PluginMgr.get_plugins('source') 137 138 for plugin in plugins: 139 print(" %s" % plugin) 140 141 142def wic_create(wks_file, rootfs_dir, bootimg_dir, kernel_dir, 143 native_sysroot, options): 144 """ 145 Create image 146 147 wks_file - user-defined OE kickstart file 148 rootfs_dir - absolute path to the build's /rootfs dir 149 bootimg_dir - absolute path to the build's boot artifacts directory 150 kernel_dir - absolute path to the build's kernel directory 151 native_sysroot - absolute path to the build's native sysroots dir 152 image_output_dir - dirname to create for image 153 options - wic command line options (debug, bmap, etc) 154 155 Normally, the values for the build artifacts values are determined 156 by 'wic -e' from the output of the 'bitbake -e' command given an 157 image name e.g. 'core-image-minimal' and a given machine set in 158 local.conf. If that's the case, the variables get the following 159 values from the output of 'bitbake -e': 160 161 rootfs_dir: IMAGE_ROOTFS 162 kernel_dir: DEPLOY_DIR_IMAGE 163 native_sysroot: STAGING_DIR_NATIVE 164 165 In the above case, bootimg_dir remains unset and the 166 plugin-specific image creation code is responsible for finding the 167 bootimg artifacts. 168 169 In the case where the values are passed in explicitly i.e 'wic -e' 170 is not used but rather the individual 'wic' options are used to 171 explicitly specify these values. 172 """ 173 try: 174 oe_builddir = os.environ["BUILDDIR"] 175 except KeyError: 176 raise WicError("BUILDDIR not found, exiting. (Did you forget to source oe-init-build-env?)") 177 178 if not os.path.exists(options.outdir): 179 os.makedirs(options.outdir) 180 181 pname = options.imager 182 plugin_class = PluginMgr.get_plugins('imager').get(pname) 183 if not plugin_class: 184 raise WicError('Unknown plugin: %s' % pname) 185 186 plugin = plugin_class(wks_file, rootfs_dir, bootimg_dir, kernel_dir, 187 native_sysroot, oe_builddir, options) 188 189 plugin.do_create() 190 191 logger.info("The image(s) were created using OE kickstart file:\n %s", wks_file) 192 193 194def wic_list(args, scripts_path): 195 """ 196 Print the list of images or source plugins. 197 """ 198 if args.list_type is None: 199 return False 200 201 if args.list_type == "images": 202 203 list_canned_images(scripts_path) 204 return True 205 elif args.list_type == "source-plugins": 206 list_source_plugins() 207 return True 208 elif len(args.help_for) == 1 and args.help_for[0] == 'help': 209 wks_file = args.list_type 210 fullpath = find_canned_image(scripts_path, wks_file) 211 if not fullpath: 212 raise WicError("No image named %s found, exiting. " 213 "(Use 'wic list images' to list available images, " 214 "or specify a fully-qualified OE kickstart (.wks) " 215 "filename)" % wks_file) 216 217 list_canned_image_help(scripts_path, fullpath) 218 return True 219 220 return False 221 222 223class Disk: 224 def __init__(self, imagepath, native_sysroot, fstypes=('fat', 'ext')): 225 self.imagepath = imagepath 226 self.native_sysroot = native_sysroot 227 self.fstypes = fstypes 228 self._partitions = None 229 self._partimages = {} 230 self._lsector_size = None 231 self._psector_size = None 232 self._ptable_format = None 233 234 # find parted 235 # read paths from $PATH environment variable 236 # if it fails, use hardcoded paths 237 pathlist = "/bin:/usr/bin:/usr/sbin:/sbin/" 238 try: 239 self.paths = os.environ['PATH'] + ":" + pathlist 240 except KeyError: 241 self.paths = pathlist 242 243 if native_sysroot: 244 for path in pathlist.split(':'): 245 self.paths = "%s%s:%s" % (native_sysroot, path, self.paths) 246 247 self.parted = find_executable("parted", self.paths) 248 if not self.parted: 249 raise WicError("Can't find executable parted") 250 251 self.partitions = self.get_partitions() 252 253 def __del__(self): 254 for path in self._partimages.values(): 255 os.unlink(path) 256 257 def get_partitions(self): 258 if self._partitions is None: 259 self._partitions = OrderedDict() 260 out = exec_cmd("%s -sm %s unit B print" % (self.parted, self.imagepath)) 261 parttype = namedtuple("Part", "pnum start end size fstype") 262 splitted = out.splitlines() 263 # skip over possible errors in exec_cmd output 264 try: 265 idx =splitted.index("BYT;") 266 except ValueError: 267 raise WicError("Error getting partition information from %s" % (self.parted)) 268 lsector_size, psector_size, self._ptable_format = splitted[idx + 1].split(":")[3:6] 269 self._lsector_size = int(lsector_size) 270 self._psector_size = int(psector_size) 271 for line in splitted[idx + 2:]: 272 pnum, start, end, size, fstype = line.split(':')[:5] 273 partition = parttype(int(pnum), int(start[:-1]), int(end[:-1]), 274 int(size[:-1]), fstype) 275 self._partitions[pnum] = partition 276 277 return self._partitions 278 279 def __getattr__(self, name): 280 """Get path to the executable in a lazy way.""" 281 if name in ("mdir", "mcopy", "mdel", "mdeltree", "sfdisk", "e2fsck", 282 "resize2fs", "mkswap", "mkdosfs", "debugfs"): 283 aname = "_%s" % name 284 if aname not in self.__dict__: 285 setattr(self, aname, find_executable(name, self.paths)) 286 if aname not in self.__dict__ or self.__dict__[aname] is None: 287 raise WicError("Can't find executable '{}'".format(name)) 288 return self.__dict__[aname] 289 return self.__dict__[name] 290 291 def _get_part_image(self, pnum): 292 if pnum not in self.partitions: 293 raise WicError("Partition %s is not in the image") 294 part = self.partitions[pnum] 295 # check if fstype is supported 296 for fstype in self.fstypes: 297 if part.fstype.startswith(fstype): 298 break 299 else: 300 raise WicError("Not supported fstype: {}".format(part.fstype)) 301 if pnum not in self._partimages: 302 tmpf = tempfile.NamedTemporaryFile(prefix="wic-part") 303 dst_fname = tmpf.name 304 tmpf.close() 305 sparse_copy(self.imagepath, dst_fname, skip=part.start, length=part.size) 306 self._partimages[pnum] = dst_fname 307 308 return self._partimages[pnum] 309 310 def _put_part_image(self, pnum): 311 """Put partition image into partitioned image.""" 312 sparse_copy(self._partimages[pnum], self.imagepath, 313 seek=self.partitions[pnum].start) 314 315 def dir(self, pnum, path): 316 if self.partitions[pnum].fstype.startswith('ext'): 317 return exec_cmd("{} {} -R 'ls -l {}'".format(self.debugfs, 318 self._get_part_image(pnum), 319 path), as_shell=True) 320 else: # fat 321 return exec_cmd("{} -i {} ::{}".format(self.mdir, 322 self._get_part_image(pnum), 323 path)) 324 325 def copy(self, src, pnum, path): 326 """Copy partition image into wic image.""" 327 if self.partitions[pnum].fstype.startswith('ext'): 328 cmd = "printf 'cd {}\nwrite {} {}\n' | {} -w {}".\ 329 format(path, src, os.path.basename(src), 330 self.debugfs, self._get_part_image(pnum)) 331 else: # fat 332 cmd = "{} -i {} -snop {} ::{}".format(self.mcopy, 333 self._get_part_image(pnum), 334 src, path) 335 exec_cmd(cmd, as_shell=True) 336 self._put_part_image(pnum) 337 338 def remove(self, pnum, path): 339 """Remove files/dirs from the partition.""" 340 partimg = self._get_part_image(pnum) 341 if self.partitions[pnum].fstype.startswith('ext'): 342 cmd = "{} {} -wR 'rm {}'".format(self.debugfs, 343 self._get_part_image(pnum), 344 path) 345 out = exec_cmd(cmd , as_shell=True) 346 for line in out.splitlines(): 347 if line.startswith("rm:"): 348 if "file is a directory" in line: 349 # Try rmdir to see if this is an empty directory. This won't delete 350 # any non empty directory so let user know about any error that this might 351 # generate. 352 print(exec_cmd("{} {} -wR 'rmdir {}'".format(self.debugfs, 353 self._get_part_image(pnum), 354 path), as_shell=True)) 355 else: 356 raise WicError("Could not complete operation: wic %s" % str(line)) 357 else: # fat 358 cmd = "{} -i {} ::{}".format(self.mdel, partimg, path) 359 try: 360 exec_cmd(cmd) 361 except WicError as err: 362 if "not found" in str(err) or "non empty" in str(err): 363 # mdel outputs 'File ... not found' or 'directory .. non empty" 364 # try to use mdeltree as path could be a directory 365 cmd = "{} -i {} ::{}".format(self.mdeltree, 366 partimg, path) 367 exec_cmd(cmd) 368 else: 369 raise err 370 self._put_part_image(pnum) 371 372 def write(self, target, expand): 373 """Write disk image to the media or file.""" 374 def write_sfdisk_script(outf, parts): 375 for key, val in parts['partitiontable'].items(): 376 if key in ("partitions", "device", "firstlba", "lastlba"): 377 continue 378 if key == "id": 379 key = "label-id" 380 outf.write("{}: {}\n".format(key, val)) 381 outf.write("\n") 382 for part in parts['partitiontable']['partitions']: 383 line = '' 384 for name in ('attrs', 'name', 'size', 'type', 'uuid'): 385 if name == 'size' and part['type'] == 'f': 386 # don't write size for extended partition 387 continue 388 val = part.get(name) 389 if val: 390 line += '{}={}, '.format(name, val) 391 if line: 392 line = line[:-2] # strip ', ' 393 if part.get('bootable'): 394 line += ' ,bootable' 395 outf.write("{}\n".format(line)) 396 outf.flush() 397 398 def read_ptable(path): 399 out = exec_cmd("{} -dJ {}".format(self.sfdisk, path)) 400 return json.loads(out) 401 402 def write_ptable(parts, target): 403 with tempfile.NamedTemporaryFile(prefix="wic-sfdisk-", mode='w') as outf: 404 write_sfdisk_script(outf, parts) 405 cmd = "{} --no-reread {} < {} ".format(self.sfdisk, target, outf.name) 406 exec_cmd(cmd, as_shell=True) 407 408 if expand is None: 409 sparse_copy(self.imagepath, target) 410 else: 411 # copy first sectors that may contain bootloader 412 sparse_copy(self.imagepath, target, length=2048 * self._lsector_size) 413 414 # copy source partition table to the target 415 parts = read_ptable(self.imagepath) 416 write_ptable(parts, target) 417 418 # get size of unpartitioned space 419 free = None 420 for line in exec_cmd("{} -F {}".format(self.sfdisk, target)).splitlines(): 421 if line.startswith("Unpartitioned space ") and line.endswith("sectors"): 422 free = int(line.split()[-2]) 423 # Align free space to a 2048 sector boundary. YOCTO #12840. 424 free = free - (free % 2048) 425 if free is None: 426 raise WicError("Can't get size of unpartitioned space") 427 428 # calculate expanded partitions sizes 429 sizes = {} 430 num_auto_resize = 0 431 for num, part in enumerate(parts['partitiontable']['partitions'], 1): 432 if num in expand: 433 if expand[num] != 0: # don't resize partition if size is set to 0 434 sectors = expand[num] // self._lsector_size 435 free -= sectors - part['size'] 436 part['size'] = sectors 437 sizes[num] = sectors 438 elif part['type'] != 'f': 439 sizes[num] = -1 440 num_auto_resize += 1 441 442 for num, part in enumerate(parts['partitiontable']['partitions'], 1): 443 if sizes.get(num) == -1: 444 part['size'] += free // num_auto_resize 445 446 # write resized partition table to the target 447 write_ptable(parts, target) 448 449 # read resized partition table 450 parts = read_ptable(target) 451 452 # copy partitions content 453 for num, part in enumerate(parts['partitiontable']['partitions'], 1): 454 pnum = str(num) 455 fstype = self.partitions[pnum].fstype 456 457 # copy unchanged partition 458 if part['size'] == self.partitions[pnum].size // self._lsector_size: 459 logger.info("copying unchanged partition {}".format(pnum)) 460 sparse_copy(self._get_part_image(pnum), target, seek=part['start'] * self._lsector_size) 461 continue 462 463 # resize or re-create partitions 464 if fstype.startswith('ext') or fstype.startswith('fat') or \ 465 fstype.startswith('linux-swap'): 466 467 partfname = None 468 with tempfile.NamedTemporaryFile(prefix="wic-part{}-".format(pnum)) as partf: 469 partfname = partf.name 470 471 if fstype.startswith('ext'): 472 logger.info("resizing ext partition {}".format(pnum)) 473 partimg = self._get_part_image(pnum) 474 sparse_copy(partimg, partfname) 475 exec_cmd("{} -pf {}".format(self.e2fsck, partfname)) 476 exec_cmd("{} {} {}s".format(\ 477 self.resize2fs, partfname, part['size'])) 478 elif fstype.startswith('fat'): 479 logger.info("copying content of the fat partition {}".format(pnum)) 480 with tempfile.TemporaryDirectory(prefix='wic-fatdir-') as tmpdir: 481 # copy content to the temporary directory 482 cmd = "{} -snompi {} :: {}".format(self.mcopy, 483 self._get_part_image(pnum), 484 tmpdir) 485 exec_cmd(cmd) 486 # create new msdos partition 487 label = part.get("name") 488 label_str = "-n {}".format(label) if label else '' 489 490 cmd = "{} {} -C {} {}".format(self.mkdosfs, label_str, partfname, 491 part['size']) 492 exec_cmd(cmd) 493 # copy content from the temporary directory to the new partition 494 cmd = "{} -snompi {} {}/* ::".format(self.mcopy, partfname, tmpdir) 495 exec_cmd(cmd, as_shell=True) 496 elif fstype.startswith('linux-swap'): 497 logger.info("creating swap partition {}".format(pnum)) 498 label = part.get("name") 499 label_str = "-L {}".format(label) if label else '' 500 uuid = part.get("uuid") 501 uuid_str = "-U {}".format(uuid) if uuid else '' 502 with open(partfname, 'w') as sparse: 503 os.ftruncate(sparse.fileno(), part['size'] * self._lsector_size) 504 exec_cmd("{} {} {} {}".format(self.mkswap, label_str, uuid_str, partfname)) 505 sparse_copy(partfname, target, seek=part['start'] * self._lsector_size) 506 os.unlink(partfname) 507 elif part['type'] != 'f': 508 logger.warning("skipping partition {}: unsupported fstype {}".format(pnum, fstype)) 509 510def wic_ls(args, native_sysroot): 511 """List contents of partitioned image or vfat partition.""" 512 disk = Disk(args.path.image, native_sysroot) 513 if not args.path.part: 514 if disk.partitions: 515 print('Num Start End Size Fstype') 516 for part in disk.partitions.values(): 517 print("{:2d} {:12d} {:12d} {:12d} {}".format(\ 518 part.pnum, part.start, part.end, 519 part.size, part.fstype)) 520 else: 521 path = args.path.path or '/' 522 print(disk.dir(args.path.part, path)) 523 524def wic_cp(args, native_sysroot): 525 """ 526 Copy local file or directory to the vfat partition of 527 partitioned image. 528 """ 529 disk = Disk(args.dest.image, native_sysroot) 530 disk.copy(args.src, args.dest.part, args.dest.path) 531 532def wic_rm(args, native_sysroot): 533 """ 534 Remove files or directories from the vfat partition of 535 partitioned image. 536 """ 537 disk = Disk(args.path.image, native_sysroot) 538 disk.remove(args.path.part, args.path.path) 539 540def wic_write(args, native_sysroot): 541 """ 542 Write image to a target device. 543 """ 544 disk = Disk(args.image, native_sysroot, ('fat', 'ext', 'swap')) 545 disk.write(args.target, args.expand) 546 547def find_canned(scripts_path, file_name): 548 """ 549 Find a file either by its path or by name in the canned files dir. 550 551 Return None if not found 552 """ 553 if os.path.exists(file_name): 554 return file_name 555 556 layers_canned_wks_dir = build_canned_image_list(scripts_path) 557 for canned_wks_dir in layers_canned_wks_dir: 558 for root, dirs, files in os.walk(canned_wks_dir): 559 for fname in files: 560 if fname == file_name: 561 fullpath = os.path.join(canned_wks_dir, fname) 562 return fullpath 563 564def get_custom_config(boot_file): 565 """ 566 Get the custom configuration to be used for the bootloader. 567 568 Return None if the file can't be found. 569 """ 570 # Get the scripts path of poky 571 scripts_path = os.path.abspath("%s/../.." % os.path.dirname(__file__)) 572 573 cfg_file = find_canned(scripts_path, boot_file) 574 if cfg_file: 575 with open(cfg_file, "r") as f: 576 config = f.read() 577 return config 578