# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU. # # Copyright (C) 2024 Amjad Alsharafi # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from typing import Callable, List, Optional, Protocol, Set import string SECTOR_SIZE = 512 DIRENTRY_SIZE = 32 ALLOWED_FILE_CHARS = set( "!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase ) class MBR: def __init__(self, data: bytes): assert len(data) == 512 self.partition_table = [] for i in range(4): partition = data[446 + i * 16 : 446 + (i + 1) * 16] self.partition_table.append( { "status": partition[0], "start_head": partition[1], "start_sector": partition[2] & 0x3F, "start_cylinder": ((partition[2] & 0xC0) << 2) | partition[3], "type": partition[4], "end_head": partition[5], "end_sector": partition[6] & 0x3F, "end_cylinder": ((partition[6] & 0xC0) << 2) | partition[7], "start_lba": int.from_bytes(partition[8:12], "little"), "size": int.from_bytes(partition[12:16], "little"), } ) def __str__(self): return "\n".join( [ f"{i}: {partition}" for i, partition in enumerate(self.partition_table) ] ) class FatBootSector: # pylint: disable=too-many-instance-attributes def __init__(self, data: bytes): assert len(data) == 512 self.bytes_per_sector = int.from_bytes(data[11:13], "little") self.sectors_per_cluster = data[13] self.reserved_sectors = int.from_bytes(data[14:16], "little") self.fat_count = data[16] self.root_entries = int.from_bytes(data[17:19], "little") total_sectors_16 = int.from_bytes(data[19:21], "little") self.media_descriptor = data[21] self.sectors_per_fat = int.from_bytes(data[22:24], "little") self.sectors_per_track = int.from_bytes(data[24:26], "little") self.heads = int.from_bytes(data[26:28], "little") self.hidden_sectors = int.from_bytes(data[28:32], "little") total_sectors_32 = int.from_bytes(data[32:36], "little") assert ( total_sectors_16 == 0 or total_sectors_32 == 0 ), "Both total sectors (16 and 32) fields are non-zero" self.total_sectors = total_sectors_16 or total_sectors_32 self.drive_number = data[36] self.volume_id = int.from_bytes(data[39:43], "little") self.volume_label = data[43:54].decode("ascii").strip() self.fs_type = data[54:62].decode("ascii").strip() def root_dir_start(self): """ Calculate the start sector of the root directory. """ return self.reserved_sectors + self.fat_count * self.sectors_per_fat def root_dir_size(self): """ Calculate the size of the root directory in sectors. """ return ( self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1 ) // self.bytes_per_sector def data_sector_start(self): """ Calculate the start sector of the data region. """ return self.root_dir_start() + self.root_dir_size() def first_sector_of_cluster(self, cluster: int) -> int: """ Calculate the first sector of the given cluster. """ return ( self.data_sector_start() + (cluster - 2) * self.sectors_per_cluster ) def cluster_bytes(self): """ Calculate the number of bytes in a cluster. """ return self.bytes_per_sector * self.sectors_per_cluster def __str__(self): return ( f"Bytes per sector: {self.bytes_per_sector}\n" f"Sectors per cluster: {self.sectors_per_cluster}\n" f"Reserved sectors: {self.reserved_sectors}\n" f"FAT count: {self.fat_count}\n" f"Root entries: {self.root_entries}\n" f"Total sectors: {self.total_sectors}\n" f"Media descriptor: {self.media_descriptor}\n" f"Sectors per FAT: {self.sectors_per_fat}\n" f"Sectors per track: {self.sectors_per_track}\n" f"Heads: {self.heads}\n" f"Hidden sectors: {self.hidden_sectors}\n" f"Drive number: {self.drive_number}\n" f"Volume ID: {self.volume_id}\n" f"Volume label: {self.volume_label}\n" f"FS type: {self.fs_type}\n" ) class FatDirectoryEntry: # pylint: disable=too-many-instance-attributes def __init__(self, data: bytes, sector: int, offset: int): self.name = data[0:8].decode("ascii").strip() self.ext = data[8:11].decode("ascii").strip() self.attributes = data[11] self.reserved = data[12] self.create_time_tenth = data[13] self.create_time = int.from_bytes(data[14:16], "little") self.create_date = int.from_bytes(data[16:18], "little") self.last_access_date = int.from_bytes(data[18:20], "little") high_cluster = int.from_bytes(data[20:22], "little") self.last_mod_time = int.from_bytes(data[22:24], "little") self.last_mod_date = int.from_bytes(data[24:26], "little") low_cluster = int.from_bytes(data[26:28], "little") self.cluster = (high_cluster << 16) | low_cluster self.size_bytes = int.from_bytes(data[28:32], "little") # extra (to help write back to disk) self.sector = sector self.offset = offset def as_bytes(self) -> bytes: return ( self.name.ljust(8, " ").encode("ascii") + self.ext.ljust(3, " ").encode("ascii") + self.attributes.to_bytes(1, "little") + self.reserved.to_bytes(1, "little") + self.create_time_tenth.to_bytes(1, "little") + self.create_time.to_bytes(2, "little") + self.create_date.to_bytes(2, "little") + self.last_access_date.to_bytes(2, "little") + (self.cluster >> 16).to_bytes(2, "little") + self.last_mod_time.to_bytes(2, "little") + self.last_mod_date.to_bytes(2, "little") + (self.cluster & 0xFFFF).to_bytes(2, "little") + self.size_bytes.to_bytes(4, "little") ) def whole_name(self): if self.ext: return f"{self.name}.{self.ext}" else: return self.name def __str__(self): return ( f"Name: {self.name}\n" f"Ext: {self.ext}\n" f"Attributes: {self.attributes}\n" f"Reserved: {self.reserved}\n" f"Create time tenth: {self.create_time_tenth}\n" f"Create time: {self.create_time}\n" f"Create date: {self.create_date}\n" f"Last access date: {self.last_access_date}\n" f"Last mod time: {self.last_mod_time}\n" f"Last mod date: {self.last_mod_date}\n" f"Cluster: {self.cluster}\n" f"Size: {self.size_bytes}\n" ) def __repr__(self): # convert to dict return str(vars(self)) class SectorReader(Protocol): def __call__(self, start_sector: int, num_sectors: int = 1) -> bytes: ... # pylint: disable=broad-exception-raised class Fat16: def __init__( self, start_sector: int, size: int, sector_reader: SectorReader, sector_writer: Callable[[int, bytes], None] ): self.start_sector = start_sector self.size_in_sectors = size self.sector_reader = sector_reader self.sector_writer = sector_writer self.boot_sector = FatBootSector(self.sector_reader(start_sector, 1)) fat_size_in_sectors = ( self.boot_sector.sectors_per_fat * self.boot_sector.fat_count ) self.fats = self.read_sectors( self.boot_sector.reserved_sectors, fat_size_in_sectors ) self.fats_dirty_sectors: Set[int] = set() def read_sectors(self, start_sector: int, num_sectors: int) -> bytes: return self.sector_reader(start_sector + self.start_sector, num_sectors) def write_sectors(self, start_sector: int, data: bytes) -> None: return self.sector_writer(start_sector + self.start_sector, data) def directory_from_bytes( self, data: bytes, start_sector: int ) -> List[FatDirectoryEntry]: """ Convert `bytes` into a list of `FatDirectoryEntry` objects. Will ignore long file names. Will stop when it encounters a 0x00 byte. """ entries = [] for i in range(0, len(data), DIRENTRY_SIZE): entry = data[i : i + DIRENTRY_SIZE] current_sector = start_sector + (i // SECTOR_SIZE) current_offset = i % SECTOR_SIZE if entry[0] == 0: break if entry[0] == 0xE5: # Deleted file continue if entry[11] & 0xF == 0xF: # Long file name continue entries.append( FatDirectoryEntry(entry, current_sector, current_offset) ) return entries def read_root_directory(self) -> List[FatDirectoryEntry]: root_dir = self.read_sectors( self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size() ) return self.directory_from_bytes( root_dir, self.boot_sector.root_dir_start() ) def read_fat_entry(self, cluster: int) -> int: """ Read the FAT entry for the given cluster. """ fat_offset = cluster * 2 # FAT16 return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little") def write_fat_entry(self, cluster: int, value: int) -> None: """ Write the FAT entry for the given cluster. """ fat_offset = cluster * 2 self.fats = ( self.fats[:fat_offset] + value.to_bytes(2, "little") + self.fats[fat_offset + 2 :] ) self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE) def flush_fats(self) -> None: """ Write the FATs back to the disk. """ for sector in self.fats_dirty_sectors: data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE] sector = self.boot_sector.reserved_sectors + sector self.write_sectors(sector, data) self.fats_dirty_sectors = set() def next_cluster(self, cluster: int) -> Optional[int]: """ Get the next cluster in the chain. If its `None`, then its the last cluster. The function will crash if the next cluster is `FREE` (unexpected) or invalid entry. """ fat_entry = self.read_fat_entry(cluster) if fat_entry == 0: raise Exception("Unexpected: FREE cluster") if fat_entry == 1: raise Exception("Unexpected: RESERVED cluster") if fat_entry >= 0xFFF8: return None if fat_entry >= 0xFFF7: raise Exception("Invalid FAT entry") return fat_entry def next_free_cluster(self) -> int: """ Find the next free cluster. """ # simple linear search for i in range(2, 0xFFFF): if self.read_fat_entry(i) == 0: return i raise Exception("No free clusters") def next_free_cluster_non_continuous(self) -> int: """ Find the next free cluster, but makes sure that the cluster before and after it are not allocated. """ # simple linear search before = False for i in range(2, 0xFFFF): if self.read_fat_entry(i) == 0: if before and self.read_fat_entry(i + 1) == 0: return i else: before = True else: before = False raise Exception("No free clusters") def read_cluster(self, cluster: int) -> bytes: """ Read the cluster at the given cluster. """ return self.read_sectors( self.boot_sector.first_sector_of_cluster(cluster), self.boot_sector.sectors_per_cluster, ) def write_cluster(self, cluster: int, data: bytes) -> None: """ Write the cluster at the given cluster. """ assert len(data) == self.boot_sector.cluster_bytes() self.write_sectors( self.boot_sector.first_sector_of_cluster(cluster), data, ) def read_directory( self, cluster: Optional[int] ) -> List[FatDirectoryEntry]: """ Read the directory at the given cluster. """ entries = [] while cluster is not None: data = self.read_cluster(cluster) entries.extend( self.directory_from_bytes( data, self.boot_sector.first_sector_of_cluster(cluster) ) ) cluster = self.next_cluster(cluster) return entries def add_direntry( self, cluster: Optional[int], name: str, ext: str, attributes: int ) -> FatDirectoryEntry: """ Add a new directory entry to the given cluster. If the cluster is `None`, then it will be added to the root directory. """ def find_free_entry(data: bytes) -> Optional[int]: for i in range(0, len(data), DIRENTRY_SIZE): entry = data[i : i + DIRENTRY_SIZE] if entry[0] == 0 or entry[0] == 0xE5: return i return None assert len(name) <= 8, "Name must be 8 characters or less" assert len(ext) <= 3, "Ext must be 3 characters or less" assert attributes % 0x15 != 0x15, "Invalid attributes" # initial dummy data new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0) new_entry.name = name.ljust(8, " ") new_entry.ext = ext.ljust(3, " ") new_entry.attributes = attributes new_entry.reserved = 0 new_entry.create_time_tenth = 0 new_entry.create_time = 0 new_entry.create_date = 0 new_entry.last_access_date = 0 new_entry.last_mod_time = 0 new_entry.last_mod_date = 0 new_entry.cluster = self.next_free_cluster() new_entry.size_bytes = 0 # mark as EOF self.write_fat_entry(new_entry.cluster, 0xFFFF) if cluster is None: for i in range(self.boot_sector.root_dir_size()): sector_data = self.read_sectors( self.boot_sector.root_dir_start() + i, 1 ) offset = find_free_entry(sector_data) if offset is not None: new_entry.sector = self.boot_sector.root_dir_start() + i new_entry.offset = offset self.update_direntry(new_entry) return new_entry else: while cluster is not None: data = self.read_cluster(cluster) offset = find_free_entry(data) if offset is not None: new_entry.sector = ( self.boot_sector.first_sector_of_cluster(cluster) + (offset // SECTOR_SIZE)) new_entry.offset = offset % SECTOR_SIZE self.update_direntry(new_entry) return new_entry cluster = self.next_cluster(cluster) raise Exception("No free directory entries") def update_direntry(self, entry: FatDirectoryEntry) -> None: """ Write the directory entry back to the disk. """ sector = self.read_sectors(entry.sector, 1) sector = ( sector[: entry.offset] + entry.as_bytes() + sector[entry.offset + DIRENTRY_SIZE :] ) self.write_sectors(entry.sector, sector) def find_direntry(self, path: str) -> Optional[FatDirectoryEntry]: """ Find the directory entry for the given path. """ assert path[0] == "/", "Path must start with /" path = path[1:] # remove the leading / parts = path.split("/") directory = self.read_root_directory() current_entry = None for i, part in enumerate(parts): is_last = i == len(parts) - 1 for entry in directory: if entry.whole_name() == part: current_entry = entry break if current_entry is None: return None if is_last: return current_entry if current_entry.attributes & 0x10 == 0: raise Exception( f"{current_entry.whole_name()} is not a directory" ) directory = self.read_directory(current_entry.cluster) assert False, "Exited loop with is_last == False" def read_file(self, entry: Optional[FatDirectoryEntry]) -> Optional[bytes]: """ Read the content of the file at the given path. """ if entry is None: return None if entry.attributes & 0x10 != 0: raise Exception(f"{entry.whole_name()} is a directory") data = b"" cluster: Optional[int] = entry.cluster while cluster is not None and len(data) <= entry.size_bytes: data += self.read_cluster(cluster) cluster = self.next_cluster(cluster) return data[: entry.size_bytes] def truncate_file( self, entry: FatDirectoryEntry, new_size: int, allocate_non_continuous: bool = False, ) -> None: """ Truncate the file at the given path to the new size. """ if entry is None: raise Exception("entry is None") if entry.attributes & 0x10 != 0: raise Exception(f"{entry.whole_name()} is a directory") def clusters_from_size(size: int) -> int: return ( size + self.boot_sector.cluster_bytes() - 1 ) // self.boot_sector.cluster_bytes() # First, allocate new FATs if we need to required_clusters = clusters_from_size(new_size) current_clusters = clusters_from_size(entry.size_bytes) affected_clusters = set() # Keep at least one cluster, easier to manage this way if required_clusters == 0: required_clusters = 1 if current_clusters == 0: current_clusters = 1 cluster: Optional[int] if required_clusters > current_clusters: # Allocate new clusters cluster = entry.cluster to_add = required_clusters for _ in range(current_clusters - 1): to_add -= 1 assert cluster is not None, "Cluster is None" affected_clusters.add(cluster) cluster = self.next_cluster(cluster) assert required_clusters > 0, "No new clusters to allocate" assert cluster is not None, "Cluster is None" assert ( self.next_cluster(cluster) is None ), "Cluster is not the last cluster" # Allocate new clusters for _ in range(to_add - 1): if allocate_non_continuous: new_cluster = self.next_free_cluster_non_continuous() else: new_cluster = self.next_free_cluster() self.write_fat_entry(cluster, new_cluster) self.write_fat_entry(new_cluster, 0xFFFF) cluster = new_cluster elif required_clusters < current_clusters: # Truncate the file cluster = entry.cluster for _ in range(required_clusters - 1): assert cluster is not None, "Cluster is None" cluster = self.next_cluster(cluster) assert cluster is not None, "Cluster is None" next_cluster = self.next_cluster(cluster) # mark last as EOF self.write_fat_entry(cluster, 0xFFFF) # free the rest while next_cluster is not None: cluster = next_cluster next_cluster = self.next_cluster(next_cluster) self.write_fat_entry(cluster, 0) self.flush_fats() # verify number of clusters cluster = entry.cluster count = 0 while cluster is not None: count += 1 affected_clusters.add(cluster) cluster = self.next_cluster(cluster) assert ( count == required_clusters ), f"Expected {required_clusters} clusters, got {count}" # update the size entry.size_bytes = new_size self.update_direntry(entry) # trigger every affected cluster for cluster in affected_clusters: first_sector = self.boot_sector.first_sector_of_cluster(cluster) first_sector_data = self.read_sectors(first_sector, 1) self.write_sectors(first_sector, first_sector_data) def write_file(self, entry: FatDirectoryEntry, data: bytes) -> None: """ Write the content of the file at the given path. """ if entry is None: raise Exception("entry is None") if entry.attributes & 0x10 != 0: raise Exception(f"{entry.whole_name()} is a directory") data_len = len(data) self.truncate_file(entry, data_len) cluster: Optional[int] = entry.cluster while cluster is not None: data_to_write = data[: self.boot_sector.cluster_bytes()] if len(data_to_write) < self.boot_sector.cluster_bytes(): old_data = self.read_cluster(cluster) data_to_write += old_data[len(data_to_write) :] self.write_cluster(cluster, data_to_write) data = data[self.boot_sector.cluster_bytes() :] if len(data) == 0: break cluster = self.next_cluster(cluster) assert ( len(data) == 0 ), "Data was not written completely, clusters missing" def create_file(self, path: str) -> Optional[FatDirectoryEntry]: """ Create a new file at the given path. """ assert path[0] == "/", "Path must start with /" path = path[1:] # remove the leading / parts = path.split("/") directory_cluster = None directory = self.read_root_directory() parts, filename = parts[:-1], parts[-1] for _, part in enumerate(parts): current_entry = None for entry in directory: if entry.whole_name() == part: current_entry = entry break if current_entry is None: return None if current_entry.attributes & 0x10 == 0: raise Exception( f"{current_entry.whole_name()} is not a directory" ) directory = self.read_directory(current_entry.cluster) directory_cluster = current_entry.cluster # add new entry to the directory filename, ext = filename.split(".") if len(ext) > 3: raise Exception("Ext must be 3 characters or less") if len(filename) > 8: raise Exception("Name must be 8 characters or less") for c in filename + ext: if c not in ALLOWED_FILE_CHARS: raise Exception("Invalid character in filename") return self.add_direntry(directory_cluster, filename, ext, 0)