1# 2# Copyright (c) 2013, Intel Corporation. 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6# DESCRIPTION 7 8# This module implements the image creation engine used by 'wic' to 9# create images. The engine parses through the OpenEmbedded kickstart 10# (wks) file specified and generates images that can then be directly 11# written onto media. 12# 13# AUTHORS 14# Tom Zanussi <tom.zanussi (at] linux.intel.com> 15# 16 17import logging 18import os 19import tempfile 20import json 21import subprocess 22 23from collections import namedtuple, OrderedDict 24from distutils.spawn import find_executable 25 26from wic import WicError 27from wic.filemap import sparse_copy 28from wic.pluginbase import PluginMgr 29from wic.misc import get_bitbake_var, exec_cmd 30 31logger = logging.getLogger('wic') 32 33def verify_build_env(): 34 """ 35 Verify that the build environment is sane. 36 37 Returns True if it is, false otherwise 38 """ 39 if not os.environ.get("BUILDDIR"): 40 raise WicError("BUILDDIR not found, exiting. (Did you forget to source oe-init-build-env?)") 41 42 return True 43 44 45CANNED_IMAGE_DIR = "lib/wic/canned-wks" # relative to scripts 46SCRIPTS_CANNED_IMAGE_DIR = "scripts/" + CANNED_IMAGE_DIR 47WIC_DIR = "wic" 48 49def build_canned_image_list(path): 50 layers_path = get_bitbake_var("BBLAYERS") 51 canned_wks_layer_dirs = [] 52 53 if layers_path is not None: 54 for layer_path in layers_path.split(): 55 for wks_path in (WIC_DIR, SCRIPTS_CANNED_IMAGE_DIR): 56 cpath = os.path.join(layer_path, wks_path) 57 if os.path.isdir(cpath): 58 canned_wks_layer_dirs.append(cpath) 59 60 cpath = os.path.join(path, CANNED_IMAGE_DIR) 61 canned_wks_layer_dirs.append(cpath) 62 63 return canned_wks_layer_dirs 64 65def find_canned_image(scripts_path, wks_file): 66 """ 67 Find a .wks file with the given name in the canned files dir. 68 69 Return False if not found 70 """ 71 layers_canned_wks_dir = build_canned_image_list(scripts_path) 72 73 for canned_wks_dir in layers_canned_wks_dir: 74 for root, dirs, files in os.walk(canned_wks_dir): 75 for fname in files: 76 if fname.endswith("~") or fname.endswith("#"): 77 continue 78 if fname.endswith(".wks") and wks_file + ".wks" == fname: 79 fullpath = os.path.join(canned_wks_dir, fname) 80 return fullpath 81 return None 82 83 84def list_canned_images(scripts_path): 85 """ 86 List the .wks files in the canned image dir, minus the extension. 87 """ 88 layers_canned_wks_dir = build_canned_image_list(scripts_path) 89 90 for canned_wks_dir in layers_canned_wks_dir: 91 for root, dirs, files in os.walk(canned_wks_dir): 92 for fname in files: 93 if fname.endswith("~") or fname.endswith("#"): 94 continue 95 if fname.endswith(".wks"): 96 fullpath = os.path.join(canned_wks_dir, fname) 97 with open(fullpath) as wks: 98 for line in wks: 99 desc = "" 100 idx = line.find("short-description:") 101 if idx != -1: 102 desc = line[idx + len("short-description:"):].strip() 103 break 104 basename = os.path.splitext(fname)[0] 105 print(" %s\t\t%s" % (basename.ljust(30), desc)) 106 107 108def list_canned_image_help(scripts_path, fullpath): 109 """ 110 List the help and params in the specified canned image. 111 """ 112 found = False 113 with open(fullpath) as wks: 114 for line in wks: 115 if not found: 116 idx = line.find("long-description:") 117 if idx != -1: 118 print() 119 print(line[idx + len("long-description:"):].strip()) 120 found = True 121 continue 122 if not line.strip(): 123 break 124 idx = line.find("#") 125 if idx != -1: 126 print(line[idx + len("#:"):].rstrip()) 127 else: 128 break 129 130 131def list_source_plugins(): 132 """ 133 List the available source plugins i.e. plugins available for --source. 134 """ 135 plugins = PluginMgr.get_plugins('source') 136 137 for plugin in plugins: 138 print(" %s" % plugin) 139 140 141def wic_create(wks_file, rootfs_dir, bootimg_dir, kernel_dir, 142 native_sysroot, options): 143 """ 144 Create image 145 146 wks_file - user-defined OE kickstart file 147 rootfs_dir - absolute path to the build's /rootfs dir 148 bootimg_dir - absolute path to the build's boot artifacts directory 149 kernel_dir - absolute path to the build's kernel directory 150 native_sysroot - absolute path to the build's native sysroots dir 151 image_output_dir - dirname to create for image 152 options - wic command line options (debug, bmap, etc) 153 154 Normally, the values for the build artifacts values are determined 155 by 'wic -e' from the output of the 'bitbake -e' command given an 156 image name e.g. 'core-image-minimal' and a given machine set in 157 local.conf. If that's the case, the variables get the following 158 values from the output of 'bitbake -e': 159 160 rootfs_dir: IMAGE_ROOTFS 161 kernel_dir: DEPLOY_DIR_IMAGE 162 native_sysroot: STAGING_DIR_NATIVE 163 164 In the above case, bootimg_dir remains unset and the 165 plugin-specific image creation code is responsible for finding the 166 bootimg artifacts. 167 168 In the case where the values are passed in explicitly i.e 'wic -e' 169 is not used but rather the individual 'wic' options are used to 170 explicitly specify these values. 171 """ 172 try: 173 oe_builddir = os.environ["BUILDDIR"] 174 except KeyError: 175 raise WicError("BUILDDIR not found, exiting. (Did you forget to source oe-init-build-env?)") 176 177 if not os.path.exists(options.outdir): 178 os.makedirs(options.outdir) 179 180 pname = options.imager 181 plugin_class = PluginMgr.get_plugins('imager').get(pname) 182 if not plugin_class: 183 raise WicError('Unknown plugin: %s' % pname) 184 185 plugin = plugin_class(wks_file, rootfs_dir, bootimg_dir, kernel_dir, 186 native_sysroot, oe_builddir, options) 187 188 plugin.do_create() 189 190 logger.info("The image(s) were created using OE kickstart file:\n %s", wks_file) 191 192 193def wic_list(args, scripts_path): 194 """ 195 Print the list of images or source plugins. 196 """ 197 if args.list_type is None: 198 return False 199 200 if args.list_type == "images": 201 202 list_canned_images(scripts_path) 203 return True 204 elif args.list_type == "source-plugins": 205 list_source_plugins() 206 return True 207 elif len(args.help_for) == 1 and args.help_for[0] == 'help': 208 wks_file = args.list_type 209 fullpath = find_canned_image(scripts_path, wks_file) 210 if not fullpath: 211 raise WicError("No image named %s found, exiting. " 212 "(Use 'wic list images' to list available images, " 213 "or specify a fully-qualified OE kickstart (.wks) " 214 "filename)" % wks_file) 215 216 list_canned_image_help(scripts_path, fullpath) 217 return True 218 219 return False 220 221 222class Disk: 223 def __init__(self, imagepath, native_sysroot, fstypes=('fat', 'ext')): 224 self.imagepath = imagepath 225 self.native_sysroot = native_sysroot 226 self.fstypes = fstypes 227 self._partitions = None 228 self._partimages = {} 229 self._lsector_size = None 230 self._psector_size = None 231 self._ptable_format = None 232 233 # find parted 234 # read paths from $PATH environment variable 235 # if it fails, use hardcoded paths 236 pathlist = "/bin:/usr/bin:/usr/sbin:/sbin/" 237 try: 238 self.paths = os.environ['PATH'] + ":" + pathlist 239 except KeyError: 240 self.paths = pathlist 241 242 if native_sysroot: 243 for path in pathlist.split(':'): 244 self.paths = "%s%s:%s" % (native_sysroot, path, self.paths) 245 246 self.parted = find_executable("parted", self.paths) 247 if not self.parted: 248 raise WicError("Can't find executable parted") 249 250 self.partitions = self.get_partitions() 251 252 def __del__(self): 253 for path in self._partimages.values(): 254 os.unlink(path) 255 256 def get_partitions(self): 257 if self._partitions is None: 258 self._partitions = OrderedDict() 259 out = exec_cmd("%s -sm %s unit B print" % (self.parted, self.imagepath)) 260 parttype = namedtuple("Part", "pnum start end size fstype") 261 splitted = out.splitlines() 262 # skip over possible errors in exec_cmd output 263 try: 264 idx =splitted.index("BYT;") 265 except ValueError: 266 raise WicError("Error getting partition information from %s" % (self.parted)) 267 lsector_size, psector_size, self._ptable_format = splitted[idx + 1].split(":")[3:6] 268 self._lsector_size = int(lsector_size) 269 self._psector_size = int(psector_size) 270 for line in splitted[idx + 2:]: 271 pnum, start, end, size, fstype = line.split(':')[:5] 272 partition = parttype(int(pnum), int(start[:-1]), int(end[:-1]), 273 int(size[:-1]), fstype) 274 self._partitions[pnum] = partition 275 276 return self._partitions 277 278 def __getattr__(self, name): 279 """Get path to the executable in a lazy way.""" 280 if name in ("mdir", "mcopy", "mdel", "mdeltree", "sfdisk", "e2fsck", 281 "resize2fs", "mkswap", "mkdosfs", "debugfs"): 282 aname = "_%s" % name 283 if aname not in self.__dict__: 284 setattr(self, aname, find_executable(name, self.paths)) 285 if aname not in self.__dict__ or self.__dict__[aname] is None: 286 raise WicError("Can't find executable '{}'".format(name)) 287 return self.__dict__[aname] 288 return self.__dict__[name] 289 290 def _get_part_image(self, pnum): 291 if pnum not in self.partitions: 292 raise WicError("Partition %s is not in the image") 293 part = self.partitions[pnum] 294 # check if fstype is supported 295 for fstype in self.fstypes: 296 if part.fstype.startswith(fstype): 297 break 298 else: 299 raise WicError("Not supported fstype: {}".format(part.fstype)) 300 if pnum not in self._partimages: 301 tmpf = tempfile.NamedTemporaryFile(prefix="wic-part") 302 dst_fname = tmpf.name 303 tmpf.close() 304 sparse_copy(self.imagepath, dst_fname, skip=part.start, length=part.size) 305 self._partimages[pnum] = dst_fname 306 307 return self._partimages[pnum] 308 309 def _put_part_image(self, pnum): 310 """Put partition image into partitioned image.""" 311 sparse_copy(self._partimages[pnum], self.imagepath, 312 seek=self.partitions[pnum].start) 313 314 def dir(self, pnum, path): 315 if self.partitions[pnum].fstype.startswith('ext'): 316 return exec_cmd("{} {} -R 'ls -l {}'".format(self.debugfs, 317 self._get_part_image(pnum), 318 path), as_shell=True) 319 else: # fat 320 return exec_cmd("{} -i {} ::{}".format(self.mdir, 321 self._get_part_image(pnum), 322 path)) 323 324 def copy(self, src, pnum, path): 325 """Copy partition image into wic image.""" 326 if self.partitions[pnum].fstype.startswith('ext'): 327 cmd = "printf 'cd {}\nwrite {} {}\n' | {} -w {}".\ 328 format(path, src, os.path.basename(src), 329 self.debugfs, self._get_part_image(pnum)) 330 else: # fat 331 cmd = "{} -i {} -snop {} ::{}".format(self.mcopy, 332 self._get_part_image(pnum), 333 src, path) 334 exec_cmd(cmd, as_shell=True) 335 self._put_part_image(pnum) 336 337 def remove(self, pnum, path): 338 """Remove files/dirs from the partition.""" 339 partimg = self._get_part_image(pnum) 340 if self.partitions[pnum].fstype.startswith('ext'): 341 cmd = "{} {} -wR 'rm {}'".format(self.debugfs, 342 self._get_part_image(pnum), 343 path) 344 out = exec_cmd(cmd , as_shell=True) 345 for line in out.splitlines(): 346 if line.startswith("rm:"): 347 if "file is a directory" in line: 348 # Try rmdir to see if this is an empty directory. This won't delete 349 # any non empty directory so let user know about any error that this might 350 # generate. 351 print(exec_cmd("{} {} -wR 'rmdir {}'".format(self.debugfs, 352 self._get_part_image(pnum), 353 path), as_shell=True)) 354 else: 355 raise WicError("Could not complete operation: wic %s" % str(line)) 356 else: # fat 357 cmd = "{} -i {} ::{}".format(self.mdel, partimg, path) 358 try: 359 exec_cmd(cmd) 360 except WicError as err: 361 if "not found" in str(err) or "non empty" in str(err): 362 # mdel outputs 'File ... not found' or 'directory .. non empty" 363 # try to use mdeltree as path could be a directory 364 cmd = "{} -i {} ::{}".format(self.mdeltree, 365 partimg, path) 366 exec_cmd(cmd) 367 else: 368 raise err 369 self._put_part_image(pnum) 370 371 def write(self, target, expand): 372 """Write disk image to the media or file.""" 373 def write_sfdisk_script(outf, parts): 374 for key, val in parts['partitiontable'].items(): 375 if key in ("partitions", "device", "firstlba", "lastlba"): 376 continue 377 if key == "id": 378 key = "label-id" 379 outf.write("{}: {}\n".format(key, val)) 380 outf.write("\n") 381 for part in parts['partitiontable']['partitions']: 382 line = '' 383 for name in ('attrs', 'name', 'size', 'type', 'uuid'): 384 if name == 'size' and part['type'] == 'f': 385 # don't write size for extended partition 386 continue 387 val = part.get(name) 388 if val: 389 line += '{}={}, '.format(name, val) 390 if line: 391 line = line[:-2] # strip ', ' 392 if part.get('bootable'): 393 line += ' ,bootable' 394 outf.write("{}\n".format(line)) 395 outf.flush() 396 397 def read_ptable(path): 398 out = exec_cmd("{} -dJ {}".format(self.sfdisk, path)) 399 return json.loads(out) 400 401 def write_ptable(parts, target): 402 with tempfile.NamedTemporaryFile(prefix="wic-sfdisk-", mode='w') as outf: 403 write_sfdisk_script(outf, parts) 404 cmd = "{} --no-reread {} < {} ".format(self.sfdisk, target, outf.name) 405 exec_cmd(cmd, as_shell=True) 406 407 if expand is None: 408 sparse_copy(self.imagepath, target) 409 else: 410 # copy first sectors that may contain bootloader 411 sparse_copy(self.imagepath, target, length=2048 * self._lsector_size) 412 413 # copy source partition table to the target 414 parts = read_ptable(self.imagepath) 415 write_ptable(parts, target) 416 417 # get size of unpartitioned space 418 free = None 419 for line in exec_cmd("{} -F {}".format(self.sfdisk, target)).splitlines(): 420 if line.startswith("Unpartitioned space ") and line.endswith("sectors"): 421 free = int(line.split()[-2]) 422 # Align free space to a 2048 sector boundary. YOCTO #12840. 423 free = free - (free % 2048) 424 if free is None: 425 raise WicError("Can't get size of unpartitioned space") 426 427 # calculate expanded partitions sizes 428 sizes = {} 429 num_auto_resize = 0 430 for num, part in enumerate(parts['partitiontable']['partitions'], 1): 431 if num in expand: 432 if expand[num] != 0: # don't resize partition if size is set to 0 433 sectors = expand[num] // self._lsector_size 434 free -= sectors - part['size'] 435 part['size'] = sectors 436 sizes[num] = sectors 437 elif part['type'] != 'f': 438 sizes[num] = -1 439 num_auto_resize += 1 440 441 for num, part in enumerate(parts['partitiontable']['partitions'], 1): 442 if sizes.get(num) == -1: 443 part['size'] += free // num_auto_resize 444 445 # write resized partition table to the target 446 write_ptable(parts, target) 447 448 # read resized partition table 449 parts = read_ptable(target) 450 451 # copy partitions content 452 for num, part in enumerate(parts['partitiontable']['partitions'], 1): 453 pnum = str(num) 454 fstype = self.partitions[pnum].fstype 455 456 # copy unchanged partition 457 if part['size'] == self.partitions[pnum].size // self._lsector_size: 458 logger.info("copying unchanged partition {}".format(pnum)) 459 sparse_copy(self._get_part_image(pnum), target, seek=part['start'] * self._lsector_size) 460 continue 461 462 # resize or re-create partitions 463 if fstype.startswith('ext') or fstype.startswith('fat') or \ 464 fstype.startswith('linux-swap'): 465 466 partfname = None 467 with tempfile.NamedTemporaryFile(prefix="wic-part{}-".format(pnum)) as partf: 468 partfname = partf.name 469 470 if fstype.startswith('ext'): 471 logger.info("resizing ext partition {}".format(pnum)) 472 partimg = self._get_part_image(pnum) 473 sparse_copy(partimg, partfname) 474 exec_cmd("{} -pf {}".format(self.e2fsck, partfname)) 475 exec_cmd("{} {} {}s".format(\ 476 self.resize2fs, partfname, part['size'])) 477 elif fstype.startswith('fat'): 478 logger.info("copying content of the fat partition {}".format(pnum)) 479 with tempfile.TemporaryDirectory(prefix='wic-fatdir-') as tmpdir: 480 # copy content to the temporary directory 481 cmd = "{} -snompi {} :: {}".format(self.mcopy, 482 self._get_part_image(pnum), 483 tmpdir) 484 exec_cmd(cmd) 485 # create new msdos partition 486 label = part.get("name") 487 label_str = "-n {}".format(label) if label else '' 488 489 cmd = "{} {} -C {} {}".format(self.mkdosfs, label_str, partfname, 490 part['size']) 491 exec_cmd(cmd) 492 # copy content from the temporary directory to the new partition 493 cmd = "{} -snompi {} {}/* ::".format(self.mcopy, partfname, tmpdir) 494 exec_cmd(cmd, as_shell=True) 495 elif fstype.startswith('linux-swap'): 496 logger.info("creating swap partition {}".format(pnum)) 497 label = part.get("name") 498 label_str = "-L {}".format(label) if label else '' 499 uuid = part.get("uuid") 500 uuid_str = "-U {}".format(uuid) if uuid else '' 501 with open(partfname, 'w') as sparse: 502 os.ftruncate(sparse.fileno(), part['size'] * self._lsector_size) 503 exec_cmd("{} {} {} {}".format(self.mkswap, label_str, uuid_str, partfname)) 504 sparse_copy(partfname, target, seek=part['start'] * self._lsector_size) 505 os.unlink(partfname) 506 elif part['type'] != 'f': 507 logger.warning("skipping partition {}: unsupported fstype {}".format(pnum, fstype)) 508 509def wic_ls(args, native_sysroot): 510 """List contents of partitioned image or vfat partition.""" 511 disk = Disk(args.path.image, native_sysroot) 512 if not args.path.part: 513 if disk.partitions: 514 print('Num Start End Size Fstype') 515 for part in disk.partitions.values(): 516 print("{:2d} {:12d} {:12d} {:12d} {}".format(\ 517 part.pnum, part.start, part.end, 518 part.size, part.fstype)) 519 else: 520 path = args.path.path or '/' 521 print(disk.dir(args.path.part, path)) 522 523def wic_cp(args, native_sysroot): 524 """ 525 Copy local file or directory to the vfat partition of 526 partitioned image. 527 """ 528 disk = Disk(args.dest.image, native_sysroot) 529 disk.copy(args.src, args.dest.part, args.dest.path) 530 531def wic_rm(args, native_sysroot): 532 """ 533 Remove files or directories from the vfat partition of 534 partitioned image. 535 """ 536 disk = Disk(args.path.image, native_sysroot) 537 disk.remove(args.path.part, args.path.path) 538 539def wic_write(args, native_sysroot): 540 """ 541 Write image to a target device. 542 """ 543 disk = Disk(args.image, native_sysroot, ('fat', 'ext', 'swap')) 544 disk.write(args.target, args.expand) 545 546def find_canned(scripts_path, file_name): 547 """ 548 Find a file either by its path or by name in the canned files dir. 549 550 Return None if not found 551 """ 552 if os.path.exists(file_name): 553 return file_name 554 555 layers_canned_wks_dir = build_canned_image_list(scripts_path) 556 for canned_wks_dir in layers_canned_wks_dir: 557 for root, dirs, files in os.walk(canned_wks_dir): 558 for fname in files: 559 if fname == file_name: 560 fullpath = os.path.join(canned_wks_dir, fname) 561 return fullpath 562 563def get_custom_config(boot_file): 564 """ 565 Get the custom configuration to be used for the bootloader. 566 567 Return None if the file can't be found. 568 """ 569 # Get the scripts path of poky 570 scripts_path = os.path.abspath("%s/../.." % os.path.dirname(__file__)) 571 572 cfg_file = find_canned(scripts_path, boot_file) 573 if cfg_file: 574 with open(cfg_file, "r") as f: 575 config = f.read() 576 return config 577