1*c8f60bfbSAmjad Alsharafi# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU. 2*c8f60bfbSAmjad Alsharafi# 3*c8f60bfbSAmjad Alsharafi# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com> 4*c8f60bfbSAmjad Alsharafi# 5*c8f60bfbSAmjad Alsharafi# This program is free software; you can redistribute it and/or modify 6*c8f60bfbSAmjad Alsharafi# it under the terms of the GNU General Public License as published by 7*c8f60bfbSAmjad Alsharafi# the Free Software Foundation; either version 2 of the License, or 8*c8f60bfbSAmjad Alsharafi# (at your option) any later version. 9*c8f60bfbSAmjad Alsharafi# 10*c8f60bfbSAmjad Alsharafi# This program is distributed in the hope that it will be useful, 11*c8f60bfbSAmjad Alsharafi# but WITHOUT ANY WARRANTY; without even the implied warranty of 12*c8f60bfbSAmjad Alsharafi# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13*c8f60bfbSAmjad Alsharafi# GNU General Public License for more details. 14*c8f60bfbSAmjad Alsharafi# 15*c8f60bfbSAmjad Alsharafi# You should have received a copy of the GNU General Public License 16*c8f60bfbSAmjad Alsharafi# along with this program. If not, see <http://www.gnu.org/licenses/>. 17*c8f60bfbSAmjad Alsharafi 18*c8f60bfbSAmjad Alsharafifrom typing import Callable, List, Optional, Protocol, Set 19*c8f60bfbSAmjad Alsharafiimport string 20*c8f60bfbSAmjad Alsharafi 21*c8f60bfbSAmjad AlsharafiSECTOR_SIZE = 512 22*c8f60bfbSAmjad AlsharafiDIRENTRY_SIZE = 32 23*c8f60bfbSAmjad AlsharafiALLOWED_FILE_CHARS = set( 24*c8f60bfbSAmjad Alsharafi "!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase 25*c8f60bfbSAmjad Alsharafi) 26*c8f60bfbSAmjad Alsharafi 27*c8f60bfbSAmjad Alsharafi 28*c8f60bfbSAmjad Alsharaficlass MBR: 29*c8f60bfbSAmjad Alsharafi def __init__(self, data: bytes): 30*c8f60bfbSAmjad Alsharafi assert len(data) == 512 31*c8f60bfbSAmjad Alsharafi self.partition_table = [] 32*c8f60bfbSAmjad Alsharafi for i in range(4): 33*c8f60bfbSAmjad Alsharafi partition = data[446 + i * 16 : 446 + (i + 1) * 16] 34*c8f60bfbSAmjad Alsharafi self.partition_table.append( 35*c8f60bfbSAmjad Alsharafi { 36*c8f60bfbSAmjad Alsharafi "status": partition[0], 37*c8f60bfbSAmjad Alsharafi "start_head": partition[1], 38*c8f60bfbSAmjad Alsharafi "start_sector": partition[2] & 0x3F, 39*c8f60bfbSAmjad Alsharafi "start_cylinder": ((partition[2] & 0xC0) << 2) 40*c8f60bfbSAmjad Alsharafi | partition[3], 41*c8f60bfbSAmjad Alsharafi "type": partition[4], 42*c8f60bfbSAmjad Alsharafi "end_head": partition[5], 43*c8f60bfbSAmjad Alsharafi "end_sector": partition[6] & 0x3F, 44*c8f60bfbSAmjad Alsharafi "end_cylinder": ((partition[6] & 0xC0) << 2) 45*c8f60bfbSAmjad Alsharafi | partition[7], 46*c8f60bfbSAmjad Alsharafi "start_lba": int.from_bytes(partition[8:12], "little"), 47*c8f60bfbSAmjad Alsharafi "size": int.from_bytes(partition[12:16], "little"), 48*c8f60bfbSAmjad Alsharafi } 49*c8f60bfbSAmjad Alsharafi ) 50*c8f60bfbSAmjad Alsharafi 51*c8f60bfbSAmjad Alsharafi def __str__(self): 52*c8f60bfbSAmjad Alsharafi return "\n".join( 53*c8f60bfbSAmjad Alsharafi [ 54*c8f60bfbSAmjad Alsharafi f"{i}: {partition}" 55*c8f60bfbSAmjad Alsharafi for i, partition in enumerate(self.partition_table) 56*c8f60bfbSAmjad Alsharafi ] 57*c8f60bfbSAmjad Alsharafi ) 58*c8f60bfbSAmjad Alsharafi 59*c8f60bfbSAmjad Alsharafi 60*c8f60bfbSAmjad Alsharaficlass FatBootSector: 61*c8f60bfbSAmjad Alsharafi # pylint: disable=too-many-instance-attributes 62*c8f60bfbSAmjad Alsharafi def __init__(self, data: bytes): 63*c8f60bfbSAmjad Alsharafi assert len(data) == 512 64*c8f60bfbSAmjad Alsharafi self.bytes_per_sector = int.from_bytes(data[11:13], "little") 65*c8f60bfbSAmjad Alsharafi self.sectors_per_cluster = data[13] 66*c8f60bfbSAmjad Alsharafi self.reserved_sectors = int.from_bytes(data[14:16], "little") 67*c8f60bfbSAmjad Alsharafi self.fat_count = data[16] 68*c8f60bfbSAmjad Alsharafi self.root_entries = int.from_bytes(data[17:19], "little") 69*c8f60bfbSAmjad Alsharafi total_sectors_16 = int.from_bytes(data[19:21], "little") 70*c8f60bfbSAmjad Alsharafi self.media_descriptor = data[21] 71*c8f60bfbSAmjad Alsharafi self.sectors_per_fat = int.from_bytes(data[22:24], "little") 72*c8f60bfbSAmjad Alsharafi self.sectors_per_track = int.from_bytes(data[24:26], "little") 73*c8f60bfbSAmjad Alsharafi self.heads = int.from_bytes(data[26:28], "little") 74*c8f60bfbSAmjad Alsharafi self.hidden_sectors = int.from_bytes(data[28:32], "little") 75*c8f60bfbSAmjad Alsharafi total_sectors_32 = int.from_bytes(data[32:36], "little") 76*c8f60bfbSAmjad Alsharafi assert ( 77*c8f60bfbSAmjad Alsharafi total_sectors_16 == 0 or total_sectors_32 == 0 78*c8f60bfbSAmjad Alsharafi ), "Both total sectors (16 and 32) fields are non-zero" 79*c8f60bfbSAmjad Alsharafi self.total_sectors = total_sectors_16 or total_sectors_32 80*c8f60bfbSAmjad Alsharafi self.drive_number = data[36] 81*c8f60bfbSAmjad Alsharafi self.volume_id = int.from_bytes(data[39:43], "little") 82*c8f60bfbSAmjad Alsharafi self.volume_label = data[43:54].decode("ascii").strip() 83*c8f60bfbSAmjad Alsharafi self.fs_type = data[54:62].decode("ascii").strip() 84*c8f60bfbSAmjad Alsharafi 85*c8f60bfbSAmjad Alsharafi def root_dir_start(self): 86*c8f60bfbSAmjad Alsharafi """ 87*c8f60bfbSAmjad Alsharafi Calculate the start sector of the root directory. 88*c8f60bfbSAmjad Alsharafi """ 89*c8f60bfbSAmjad Alsharafi return self.reserved_sectors + self.fat_count * self.sectors_per_fat 90*c8f60bfbSAmjad Alsharafi 91*c8f60bfbSAmjad Alsharafi def root_dir_size(self): 92*c8f60bfbSAmjad Alsharafi """ 93*c8f60bfbSAmjad Alsharafi Calculate the size of the root directory in sectors. 94*c8f60bfbSAmjad Alsharafi """ 95*c8f60bfbSAmjad Alsharafi return ( 96*c8f60bfbSAmjad Alsharafi self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1 97*c8f60bfbSAmjad Alsharafi ) // self.bytes_per_sector 98*c8f60bfbSAmjad Alsharafi 99*c8f60bfbSAmjad Alsharafi def data_sector_start(self): 100*c8f60bfbSAmjad Alsharafi """ 101*c8f60bfbSAmjad Alsharafi Calculate the start sector of the data region. 102*c8f60bfbSAmjad Alsharafi """ 103*c8f60bfbSAmjad Alsharafi return self.root_dir_start() + self.root_dir_size() 104*c8f60bfbSAmjad Alsharafi 105*c8f60bfbSAmjad Alsharafi def first_sector_of_cluster(self, cluster: int) -> int: 106*c8f60bfbSAmjad Alsharafi """ 107*c8f60bfbSAmjad Alsharafi Calculate the first sector of the given cluster. 108*c8f60bfbSAmjad Alsharafi """ 109*c8f60bfbSAmjad Alsharafi return ( 110*c8f60bfbSAmjad Alsharafi self.data_sector_start() + (cluster - 2) * self.sectors_per_cluster 111*c8f60bfbSAmjad Alsharafi ) 112*c8f60bfbSAmjad Alsharafi 113*c8f60bfbSAmjad Alsharafi def cluster_bytes(self): 114*c8f60bfbSAmjad Alsharafi """ 115*c8f60bfbSAmjad Alsharafi Calculate the number of bytes in a cluster. 116*c8f60bfbSAmjad Alsharafi """ 117*c8f60bfbSAmjad Alsharafi return self.bytes_per_sector * self.sectors_per_cluster 118*c8f60bfbSAmjad Alsharafi 119*c8f60bfbSAmjad Alsharafi def __str__(self): 120*c8f60bfbSAmjad Alsharafi return ( 121*c8f60bfbSAmjad Alsharafi f"Bytes per sector: {self.bytes_per_sector}\n" 122*c8f60bfbSAmjad Alsharafi f"Sectors per cluster: {self.sectors_per_cluster}\n" 123*c8f60bfbSAmjad Alsharafi f"Reserved sectors: {self.reserved_sectors}\n" 124*c8f60bfbSAmjad Alsharafi f"FAT count: {self.fat_count}\n" 125*c8f60bfbSAmjad Alsharafi f"Root entries: {self.root_entries}\n" 126*c8f60bfbSAmjad Alsharafi f"Total sectors: {self.total_sectors}\n" 127*c8f60bfbSAmjad Alsharafi f"Media descriptor: {self.media_descriptor}\n" 128*c8f60bfbSAmjad Alsharafi f"Sectors per FAT: {self.sectors_per_fat}\n" 129*c8f60bfbSAmjad Alsharafi f"Sectors per track: {self.sectors_per_track}\n" 130*c8f60bfbSAmjad Alsharafi f"Heads: {self.heads}\n" 131*c8f60bfbSAmjad Alsharafi f"Hidden sectors: {self.hidden_sectors}\n" 132*c8f60bfbSAmjad Alsharafi f"Drive number: {self.drive_number}\n" 133*c8f60bfbSAmjad Alsharafi f"Volume ID: {self.volume_id}\n" 134*c8f60bfbSAmjad Alsharafi f"Volume label: {self.volume_label}\n" 135*c8f60bfbSAmjad Alsharafi f"FS type: {self.fs_type}\n" 136*c8f60bfbSAmjad Alsharafi ) 137*c8f60bfbSAmjad Alsharafi 138*c8f60bfbSAmjad Alsharafi 139*c8f60bfbSAmjad Alsharaficlass FatDirectoryEntry: 140*c8f60bfbSAmjad Alsharafi # pylint: disable=too-many-instance-attributes 141*c8f60bfbSAmjad Alsharafi def __init__(self, data: bytes, sector: int, offset: int): 142*c8f60bfbSAmjad Alsharafi self.name = data[0:8].decode("ascii").strip() 143*c8f60bfbSAmjad Alsharafi self.ext = data[8:11].decode("ascii").strip() 144*c8f60bfbSAmjad Alsharafi self.attributes = data[11] 145*c8f60bfbSAmjad Alsharafi self.reserved = data[12] 146*c8f60bfbSAmjad Alsharafi self.create_time_tenth = data[13] 147*c8f60bfbSAmjad Alsharafi self.create_time = int.from_bytes(data[14:16], "little") 148*c8f60bfbSAmjad Alsharafi self.create_date = int.from_bytes(data[16:18], "little") 149*c8f60bfbSAmjad Alsharafi self.last_access_date = int.from_bytes(data[18:20], "little") 150*c8f60bfbSAmjad Alsharafi high_cluster = int.from_bytes(data[20:22], "little") 151*c8f60bfbSAmjad Alsharafi self.last_mod_time = int.from_bytes(data[22:24], "little") 152*c8f60bfbSAmjad Alsharafi self.last_mod_date = int.from_bytes(data[24:26], "little") 153*c8f60bfbSAmjad Alsharafi low_cluster = int.from_bytes(data[26:28], "little") 154*c8f60bfbSAmjad Alsharafi self.cluster = (high_cluster << 16) | low_cluster 155*c8f60bfbSAmjad Alsharafi self.size_bytes = int.from_bytes(data[28:32], "little") 156*c8f60bfbSAmjad Alsharafi 157*c8f60bfbSAmjad Alsharafi # extra (to help write back to disk) 158*c8f60bfbSAmjad Alsharafi self.sector = sector 159*c8f60bfbSAmjad Alsharafi self.offset = offset 160*c8f60bfbSAmjad Alsharafi 161*c8f60bfbSAmjad Alsharafi def as_bytes(self) -> bytes: 162*c8f60bfbSAmjad Alsharafi return ( 163*c8f60bfbSAmjad Alsharafi self.name.ljust(8, " ").encode("ascii") 164*c8f60bfbSAmjad Alsharafi + self.ext.ljust(3, " ").encode("ascii") 165*c8f60bfbSAmjad Alsharafi + self.attributes.to_bytes(1, "little") 166*c8f60bfbSAmjad Alsharafi + self.reserved.to_bytes(1, "little") 167*c8f60bfbSAmjad Alsharafi + self.create_time_tenth.to_bytes(1, "little") 168*c8f60bfbSAmjad Alsharafi + self.create_time.to_bytes(2, "little") 169*c8f60bfbSAmjad Alsharafi + self.create_date.to_bytes(2, "little") 170*c8f60bfbSAmjad Alsharafi + self.last_access_date.to_bytes(2, "little") 171*c8f60bfbSAmjad Alsharafi + (self.cluster >> 16).to_bytes(2, "little") 172*c8f60bfbSAmjad Alsharafi + self.last_mod_time.to_bytes(2, "little") 173*c8f60bfbSAmjad Alsharafi + self.last_mod_date.to_bytes(2, "little") 174*c8f60bfbSAmjad Alsharafi + (self.cluster & 0xFFFF).to_bytes(2, "little") 175*c8f60bfbSAmjad Alsharafi + self.size_bytes.to_bytes(4, "little") 176*c8f60bfbSAmjad Alsharafi ) 177*c8f60bfbSAmjad Alsharafi 178*c8f60bfbSAmjad Alsharafi def whole_name(self): 179*c8f60bfbSAmjad Alsharafi if self.ext: 180*c8f60bfbSAmjad Alsharafi return f"{self.name}.{self.ext}" 181*c8f60bfbSAmjad Alsharafi else: 182*c8f60bfbSAmjad Alsharafi return self.name 183*c8f60bfbSAmjad Alsharafi 184*c8f60bfbSAmjad Alsharafi def __str__(self): 185*c8f60bfbSAmjad Alsharafi return ( 186*c8f60bfbSAmjad Alsharafi f"Name: {self.name}\n" 187*c8f60bfbSAmjad Alsharafi f"Ext: {self.ext}\n" 188*c8f60bfbSAmjad Alsharafi f"Attributes: {self.attributes}\n" 189*c8f60bfbSAmjad Alsharafi f"Reserved: {self.reserved}\n" 190*c8f60bfbSAmjad Alsharafi f"Create time tenth: {self.create_time_tenth}\n" 191*c8f60bfbSAmjad Alsharafi f"Create time: {self.create_time}\n" 192*c8f60bfbSAmjad Alsharafi f"Create date: {self.create_date}\n" 193*c8f60bfbSAmjad Alsharafi f"Last access date: {self.last_access_date}\n" 194*c8f60bfbSAmjad Alsharafi f"Last mod time: {self.last_mod_time}\n" 195*c8f60bfbSAmjad Alsharafi f"Last mod date: {self.last_mod_date}\n" 196*c8f60bfbSAmjad Alsharafi f"Cluster: {self.cluster}\n" 197*c8f60bfbSAmjad Alsharafi f"Size: {self.size_bytes}\n" 198*c8f60bfbSAmjad Alsharafi ) 199*c8f60bfbSAmjad Alsharafi 200*c8f60bfbSAmjad Alsharafi def __repr__(self): 201*c8f60bfbSAmjad Alsharafi # convert to dict 202*c8f60bfbSAmjad Alsharafi return str(vars(self)) 203*c8f60bfbSAmjad Alsharafi 204*c8f60bfbSAmjad Alsharafi 205*c8f60bfbSAmjad Alsharaficlass SectorReader(Protocol): 206*c8f60bfbSAmjad Alsharafi def __call__(self, start_sector: int, num_sectors: int = 1) -> bytes: ... 207*c8f60bfbSAmjad Alsharafi 208*c8f60bfbSAmjad Alsharafi# pylint: disable=broad-exception-raised 209*c8f60bfbSAmjad Alsharaficlass Fat16: 210*c8f60bfbSAmjad Alsharafi def __init__( 211*c8f60bfbSAmjad Alsharafi self, 212*c8f60bfbSAmjad Alsharafi start_sector: int, 213*c8f60bfbSAmjad Alsharafi size: int, 214*c8f60bfbSAmjad Alsharafi sector_reader: SectorReader, 215*c8f60bfbSAmjad Alsharafi sector_writer: Callable[[int, bytes], None] 216*c8f60bfbSAmjad Alsharafi ): 217*c8f60bfbSAmjad Alsharafi self.start_sector = start_sector 218*c8f60bfbSAmjad Alsharafi self.size_in_sectors = size 219*c8f60bfbSAmjad Alsharafi self.sector_reader = sector_reader 220*c8f60bfbSAmjad Alsharafi self.sector_writer = sector_writer 221*c8f60bfbSAmjad Alsharafi 222*c8f60bfbSAmjad Alsharafi self.boot_sector = FatBootSector(self.sector_reader(start_sector, 1)) 223*c8f60bfbSAmjad Alsharafi 224*c8f60bfbSAmjad Alsharafi fat_size_in_sectors = ( 225*c8f60bfbSAmjad Alsharafi self.boot_sector.sectors_per_fat * self.boot_sector.fat_count 226*c8f60bfbSAmjad Alsharafi ) 227*c8f60bfbSAmjad Alsharafi self.fats = self.read_sectors( 228*c8f60bfbSAmjad Alsharafi self.boot_sector.reserved_sectors, fat_size_in_sectors 229*c8f60bfbSAmjad Alsharafi ) 230*c8f60bfbSAmjad Alsharafi self.fats_dirty_sectors: Set[int] = set() 231*c8f60bfbSAmjad Alsharafi 232*c8f60bfbSAmjad Alsharafi def read_sectors(self, start_sector: int, num_sectors: int) -> bytes: 233*c8f60bfbSAmjad Alsharafi return self.sector_reader(start_sector + self.start_sector, 234*c8f60bfbSAmjad Alsharafi num_sectors) 235*c8f60bfbSAmjad Alsharafi 236*c8f60bfbSAmjad Alsharafi def write_sectors(self, start_sector: int, data: bytes) -> None: 237*c8f60bfbSAmjad Alsharafi return self.sector_writer(start_sector + self.start_sector, data) 238*c8f60bfbSAmjad Alsharafi 239*c8f60bfbSAmjad Alsharafi def directory_from_bytes( 240*c8f60bfbSAmjad Alsharafi self, data: bytes, start_sector: int 241*c8f60bfbSAmjad Alsharafi ) -> List[FatDirectoryEntry]: 242*c8f60bfbSAmjad Alsharafi """ 243*c8f60bfbSAmjad Alsharafi Convert `bytes` into a list of `FatDirectoryEntry` objects. 244*c8f60bfbSAmjad Alsharafi Will ignore long file names. 245*c8f60bfbSAmjad Alsharafi Will stop when it encounters a 0x00 byte. 246*c8f60bfbSAmjad Alsharafi """ 247*c8f60bfbSAmjad Alsharafi 248*c8f60bfbSAmjad Alsharafi entries = [] 249*c8f60bfbSAmjad Alsharafi for i in range(0, len(data), DIRENTRY_SIZE): 250*c8f60bfbSAmjad Alsharafi entry = data[i : i + DIRENTRY_SIZE] 251*c8f60bfbSAmjad Alsharafi 252*c8f60bfbSAmjad Alsharafi current_sector = start_sector + (i // SECTOR_SIZE) 253*c8f60bfbSAmjad Alsharafi current_offset = i % SECTOR_SIZE 254*c8f60bfbSAmjad Alsharafi 255*c8f60bfbSAmjad Alsharafi if entry[0] == 0: 256*c8f60bfbSAmjad Alsharafi break 257*c8f60bfbSAmjad Alsharafi 258*c8f60bfbSAmjad Alsharafi if entry[0] == 0xE5: 259*c8f60bfbSAmjad Alsharafi # Deleted file 260*c8f60bfbSAmjad Alsharafi continue 261*c8f60bfbSAmjad Alsharafi 262*c8f60bfbSAmjad Alsharafi if entry[11] & 0xF == 0xF: 263*c8f60bfbSAmjad Alsharafi # Long file name 264*c8f60bfbSAmjad Alsharafi continue 265*c8f60bfbSAmjad Alsharafi 266*c8f60bfbSAmjad Alsharafi entries.append( 267*c8f60bfbSAmjad Alsharafi FatDirectoryEntry(entry, current_sector, current_offset) 268*c8f60bfbSAmjad Alsharafi ) 269*c8f60bfbSAmjad Alsharafi return entries 270*c8f60bfbSAmjad Alsharafi 271*c8f60bfbSAmjad Alsharafi def read_root_directory(self) -> List[FatDirectoryEntry]: 272*c8f60bfbSAmjad Alsharafi root_dir = self.read_sectors( 273*c8f60bfbSAmjad Alsharafi self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size() 274*c8f60bfbSAmjad Alsharafi ) 275*c8f60bfbSAmjad Alsharafi return self.directory_from_bytes( 276*c8f60bfbSAmjad Alsharafi root_dir, self.boot_sector.root_dir_start() 277*c8f60bfbSAmjad Alsharafi ) 278*c8f60bfbSAmjad Alsharafi 279*c8f60bfbSAmjad Alsharafi def read_fat_entry(self, cluster: int) -> int: 280*c8f60bfbSAmjad Alsharafi """ 281*c8f60bfbSAmjad Alsharafi Read the FAT entry for the given cluster. 282*c8f60bfbSAmjad Alsharafi """ 283*c8f60bfbSAmjad Alsharafi fat_offset = cluster * 2 # FAT16 284*c8f60bfbSAmjad Alsharafi return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little") 285*c8f60bfbSAmjad Alsharafi 286*c8f60bfbSAmjad Alsharafi def write_fat_entry(self, cluster: int, value: int) -> None: 287*c8f60bfbSAmjad Alsharafi """ 288*c8f60bfbSAmjad Alsharafi Write the FAT entry for the given cluster. 289*c8f60bfbSAmjad Alsharafi """ 290*c8f60bfbSAmjad Alsharafi fat_offset = cluster * 2 291*c8f60bfbSAmjad Alsharafi self.fats = ( 292*c8f60bfbSAmjad Alsharafi self.fats[:fat_offset] 293*c8f60bfbSAmjad Alsharafi + value.to_bytes(2, "little") 294*c8f60bfbSAmjad Alsharafi + self.fats[fat_offset + 2 :] 295*c8f60bfbSAmjad Alsharafi ) 296*c8f60bfbSAmjad Alsharafi self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE) 297*c8f60bfbSAmjad Alsharafi 298*c8f60bfbSAmjad Alsharafi def flush_fats(self) -> None: 299*c8f60bfbSAmjad Alsharafi """ 300*c8f60bfbSAmjad Alsharafi Write the FATs back to the disk. 301*c8f60bfbSAmjad Alsharafi """ 302*c8f60bfbSAmjad Alsharafi for sector in self.fats_dirty_sectors: 303*c8f60bfbSAmjad Alsharafi data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE] 304*c8f60bfbSAmjad Alsharafi sector = self.boot_sector.reserved_sectors + sector 305*c8f60bfbSAmjad Alsharafi self.write_sectors(sector, data) 306*c8f60bfbSAmjad Alsharafi self.fats_dirty_sectors = set() 307*c8f60bfbSAmjad Alsharafi 308*c8f60bfbSAmjad Alsharafi def next_cluster(self, cluster: int) -> Optional[int]: 309*c8f60bfbSAmjad Alsharafi """ 310*c8f60bfbSAmjad Alsharafi Get the next cluster in the chain. 311*c8f60bfbSAmjad Alsharafi If its `None`, then its the last cluster. 312*c8f60bfbSAmjad Alsharafi The function will crash if the next cluster 313*c8f60bfbSAmjad Alsharafi is `FREE` (unexpected) or invalid entry. 314*c8f60bfbSAmjad Alsharafi """ 315*c8f60bfbSAmjad Alsharafi fat_entry = self.read_fat_entry(cluster) 316*c8f60bfbSAmjad Alsharafi if fat_entry == 0: 317*c8f60bfbSAmjad Alsharafi raise Exception("Unexpected: FREE cluster") 318*c8f60bfbSAmjad Alsharafi if fat_entry == 1: 319*c8f60bfbSAmjad Alsharafi raise Exception("Unexpected: RESERVED cluster") 320*c8f60bfbSAmjad Alsharafi if fat_entry >= 0xFFF8: 321*c8f60bfbSAmjad Alsharafi return None 322*c8f60bfbSAmjad Alsharafi if fat_entry >= 0xFFF7: 323*c8f60bfbSAmjad Alsharafi raise Exception("Invalid FAT entry") 324*c8f60bfbSAmjad Alsharafi 325*c8f60bfbSAmjad Alsharafi return fat_entry 326*c8f60bfbSAmjad Alsharafi 327*c8f60bfbSAmjad Alsharafi def next_free_cluster(self) -> int: 328*c8f60bfbSAmjad Alsharafi """ 329*c8f60bfbSAmjad Alsharafi Find the next free cluster. 330*c8f60bfbSAmjad Alsharafi """ 331*c8f60bfbSAmjad Alsharafi # simple linear search 332*c8f60bfbSAmjad Alsharafi for i in range(2, 0xFFFF): 333*c8f60bfbSAmjad Alsharafi if self.read_fat_entry(i) == 0: 334*c8f60bfbSAmjad Alsharafi return i 335*c8f60bfbSAmjad Alsharafi raise Exception("No free clusters") 336*c8f60bfbSAmjad Alsharafi 337*c8f60bfbSAmjad Alsharafi def next_free_cluster_non_continuous(self) -> int: 338*c8f60bfbSAmjad Alsharafi """ 339*c8f60bfbSAmjad Alsharafi Find the next free cluster, but makes sure 340*c8f60bfbSAmjad Alsharafi that the cluster before and after it are not allocated. 341*c8f60bfbSAmjad Alsharafi """ 342*c8f60bfbSAmjad Alsharafi # simple linear search 343*c8f60bfbSAmjad Alsharafi before = False 344*c8f60bfbSAmjad Alsharafi for i in range(2, 0xFFFF): 345*c8f60bfbSAmjad Alsharafi if self.read_fat_entry(i) == 0: 346*c8f60bfbSAmjad Alsharafi if before and self.read_fat_entry(i + 1) == 0: 347*c8f60bfbSAmjad Alsharafi return i 348*c8f60bfbSAmjad Alsharafi else: 349*c8f60bfbSAmjad Alsharafi before = True 350*c8f60bfbSAmjad Alsharafi else: 351*c8f60bfbSAmjad Alsharafi before = False 352*c8f60bfbSAmjad Alsharafi 353*c8f60bfbSAmjad Alsharafi raise Exception("No free clusters") 354*c8f60bfbSAmjad Alsharafi 355*c8f60bfbSAmjad Alsharafi def read_cluster(self, cluster: int) -> bytes: 356*c8f60bfbSAmjad Alsharafi """ 357*c8f60bfbSAmjad Alsharafi Read the cluster at the given cluster. 358*c8f60bfbSAmjad Alsharafi """ 359*c8f60bfbSAmjad Alsharafi return self.read_sectors( 360*c8f60bfbSAmjad Alsharafi self.boot_sector.first_sector_of_cluster(cluster), 361*c8f60bfbSAmjad Alsharafi self.boot_sector.sectors_per_cluster, 362*c8f60bfbSAmjad Alsharafi ) 363*c8f60bfbSAmjad Alsharafi 364*c8f60bfbSAmjad Alsharafi def write_cluster(self, cluster: int, data: bytes) -> None: 365*c8f60bfbSAmjad Alsharafi """ 366*c8f60bfbSAmjad Alsharafi Write the cluster at the given cluster. 367*c8f60bfbSAmjad Alsharafi """ 368*c8f60bfbSAmjad Alsharafi assert len(data) == self.boot_sector.cluster_bytes() 369*c8f60bfbSAmjad Alsharafi self.write_sectors( 370*c8f60bfbSAmjad Alsharafi self.boot_sector.first_sector_of_cluster(cluster), 371*c8f60bfbSAmjad Alsharafi data, 372*c8f60bfbSAmjad Alsharafi ) 373*c8f60bfbSAmjad Alsharafi 374*c8f60bfbSAmjad Alsharafi def read_directory( 375*c8f60bfbSAmjad Alsharafi self, cluster: Optional[int] 376*c8f60bfbSAmjad Alsharafi ) -> List[FatDirectoryEntry]: 377*c8f60bfbSAmjad Alsharafi """ 378*c8f60bfbSAmjad Alsharafi Read the directory at the given cluster. 379*c8f60bfbSAmjad Alsharafi """ 380*c8f60bfbSAmjad Alsharafi entries = [] 381*c8f60bfbSAmjad Alsharafi while cluster is not None: 382*c8f60bfbSAmjad Alsharafi data = self.read_cluster(cluster) 383*c8f60bfbSAmjad Alsharafi entries.extend( 384*c8f60bfbSAmjad Alsharafi self.directory_from_bytes( 385*c8f60bfbSAmjad Alsharafi data, self.boot_sector.first_sector_of_cluster(cluster) 386*c8f60bfbSAmjad Alsharafi ) 387*c8f60bfbSAmjad Alsharafi ) 388*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 389*c8f60bfbSAmjad Alsharafi return entries 390*c8f60bfbSAmjad Alsharafi 391*c8f60bfbSAmjad Alsharafi def add_direntry( 392*c8f60bfbSAmjad Alsharafi self, cluster: Optional[int], name: str, ext: str, attributes: int 393*c8f60bfbSAmjad Alsharafi ) -> FatDirectoryEntry: 394*c8f60bfbSAmjad Alsharafi """ 395*c8f60bfbSAmjad Alsharafi Add a new directory entry to the given cluster. 396*c8f60bfbSAmjad Alsharafi If the cluster is `None`, then it will be added to the root directory. 397*c8f60bfbSAmjad Alsharafi """ 398*c8f60bfbSAmjad Alsharafi 399*c8f60bfbSAmjad Alsharafi def find_free_entry(data: bytes) -> Optional[int]: 400*c8f60bfbSAmjad Alsharafi for i in range(0, len(data), DIRENTRY_SIZE): 401*c8f60bfbSAmjad Alsharafi entry = data[i : i + DIRENTRY_SIZE] 402*c8f60bfbSAmjad Alsharafi if entry[0] == 0 or entry[0] == 0xE5: 403*c8f60bfbSAmjad Alsharafi return i 404*c8f60bfbSAmjad Alsharafi return None 405*c8f60bfbSAmjad Alsharafi 406*c8f60bfbSAmjad Alsharafi assert len(name) <= 8, "Name must be 8 characters or less" 407*c8f60bfbSAmjad Alsharafi assert len(ext) <= 3, "Ext must be 3 characters or less" 408*c8f60bfbSAmjad Alsharafi assert attributes % 0x15 != 0x15, "Invalid attributes" 409*c8f60bfbSAmjad Alsharafi 410*c8f60bfbSAmjad Alsharafi # initial dummy data 411*c8f60bfbSAmjad Alsharafi new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0) 412*c8f60bfbSAmjad Alsharafi new_entry.name = name.ljust(8, " ") 413*c8f60bfbSAmjad Alsharafi new_entry.ext = ext.ljust(3, " ") 414*c8f60bfbSAmjad Alsharafi new_entry.attributes = attributes 415*c8f60bfbSAmjad Alsharafi new_entry.reserved = 0 416*c8f60bfbSAmjad Alsharafi new_entry.create_time_tenth = 0 417*c8f60bfbSAmjad Alsharafi new_entry.create_time = 0 418*c8f60bfbSAmjad Alsharafi new_entry.create_date = 0 419*c8f60bfbSAmjad Alsharafi new_entry.last_access_date = 0 420*c8f60bfbSAmjad Alsharafi new_entry.last_mod_time = 0 421*c8f60bfbSAmjad Alsharafi new_entry.last_mod_date = 0 422*c8f60bfbSAmjad Alsharafi new_entry.cluster = self.next_free_cluster() 423*c8f60bfbSAmjad Alsharafi new_entry.size_bytes = 0 424*c8f60bfbSAmjad Alsharafi 425*c8f60bfbSAmjad Alsharafi # mark as EOF 426*c8f60bfbSAmjad Alsharafi self.write_fat_entry(new_entry.cluster, 0xFFFF) 427*c8f60bfbSAmjad Alsharafi 428*c8f60bfbSAmjad Alsharafi if cluster is None: 429*c8f60bfbSAmjad Alsharafi for i in range(self.boot_sector.root_dir_size()): 430*c8f60bfbSAmjad Alsharafi sector_data = self.read_sectors( 431*c8f60bfbSAmjad Alsharafi self.boot_sector.root_dir_start() + i, 1 432*c8f60bfbSAmjad Alsharafi ) 433*c8f60bfbSAmjad Alsharafi offset = find_free_entry(sector_data) 434*c8f60bfbSAmjad Alsharafi if offset is not None: 435*c8f60bfbSAmjad Alsharafi new_entry.sector = self.boot_sector.root_dir_start() + i 436*c8f60bfbSAmjad Alsharafi new_entry.offset = offset 437*c8f60bfbSAmjad Alsharafi self.update_direntry(new_entry) 438*c8f60bfbSAmjad Alsharafi return new_entry 439*c8f60bfbSAmjad Alsharafi else: 440*c8f60bfbSAmjad Alsharafi while cluster is not None: 441*c8f60bfbSAmjad Alsharafi data = self.read_cluster(cluster) 442*c8f60bfbSAmjad Alsharafi offset = find_free_entry(data) 443*c8f60bfbSAmjad Alsharafi if offset is not None: 444*c8f60bfbSAmjad Alsharafi new_entry.sector = ( 445*c8f60bfbSAmjad Alsharafi self.boot_sector.first_sector_of_cluster(cluster) 446*c8f60bfbSAmjad Alsharafi + (offset // SECTOR_SIZE)) 447*c8f60bfbSAmjad Alsharafi new_entry.offset = offset % SECTOR_SIZE 448*c8f60bfbSAmjad Alsharafi self.update_direntry(new_entry) 449*c8f60bfbSAmjad Alsharafi return new_entry 450*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 451*c8f60bfbSAmjad Alsharafi 452*c8f60bfbSAmjad Alsharafi raise Exception("No free directory entries") 453*c8f60bfbSAmjad Alsharafi 454*c8f60bfbSAmjad Alsharafi def update_direntry(self, entry: FatDirectoryEntry) -> None: 455*c8f60bfbSAmjad Alsharafi """ 456*c8f60bfbSAmjad Alsharafi Write the directory entry back to the disk. 457*c8f60bfbSAmjad Alsharafi """ 458*c8f60bfbSAmjad Alsharafi sector = self.read_sectors(entry.sector, 1) 459*c8f60bfbSAmjad Alsharafi sector = ( 460*c8f60bfbSAmjad Alsharafi sector[: entry.offset] 461*c8f60bfbSAmjad Alsharafi + entry.as_bytes() 462*c8f60bfbSAmjad Alsharafi + sector[entry.offset + DIRENTRY_SIZE :] 463*c8f60bfbSAmjad Alsharafi ) 464*c8f60bfbSAmjad Alsharafi self.write_sectors(entry.sector, sector) 465*c8f60bfbSAmjad Alsharafi 466*c8f60bfbSAmjad Alsharafi def find_direntry(self, path: str) -> Optional[FatDirectoryEntry]: 467*c8f60bfbSAmjad Alsharafi """ 468*c8f60bfbSAmjad Alsharafi Find the directory entry for the given path. 469*c8f60bfbSAmjad Alsharafi """ 470*c8f60bfbSAmjad Alsharafi assert path[0] == "/", "Path must start with /" 471*c8f60bfbSAmjad Alsharafi 472*c8f60bfbSAmjad Alsharafi path = path[1:] # remove the leading / 473*c8f60bfbSAmjad Alsharafi parts = path.split("/") 474*c8f60bfbSAmjad Alsharafi directory = self.read_root_directory() 475*c8f60bfbSAmjad Alsharafi 476*c8f60bfbSAmjad Alsharafi current_entry = None 477*c8f60bfbSAmjad Alsharafi 478*c8f60bfbSAmjad Alsharafi for i, part in enumerate(parts): 479*c8f60bfbSAmjad Alsharafi is_last = i == len(parts) - 1 480*c8f60bfbSAmjad Alsharafi 481*c8f60bfbSAmjad Alsharafi for entry in directory: 482*c8f60bfbSAmjad Alsharafi if entry.whole_name() == part: 483*c8f60bfbSAmjad Alsharafi current_entry = entry 484*c8f60bfbSAmjad Alsharafi break 485*c8f60bfbSAmjad Alsharafi if current_entry is None: 486*c8f60bfbSAmjad Alsharafi return None 487*c8f60bfbSAmjad Alsharafi 488*c8f60bfbSAmjad Alsharafi if is_last: 489*c8f60bfbSAmjad Alsharafi return current_entry 490*c8f60bfbSAmjad Alsharafi 491*c8f60bfbSAmjad Alsharafi if current_entry.attributes & 0x10 == 0: 492*c8f60bfbSAmjad Alsharafi raise Exception( 493*c8f60bfbSAmjad Alsharafi f"{current_entry.whole_name()} is not a directory" 494*c8f60bfbSAmjad Alsharafi ) 495*c8f60bfbSAmjad Alsharafi 496*c8f60bfbSAmjad Alsharafi directory = self.read_directory(current_entry.cluster) 497*c8f60bfbSAmjad Alsharafi 498*c8f60bfbSAmjad Alsharafi assert False, "Exited loop with is_last == False" 499*c8f60bfbSAmjad Alsharafi 500*c8f60bfbSAmjad Alsharafi def read_file(self, entry: Optional[FatDirectoryEntry]) -> Optional[bytes]: 501*c8f60bfbSAmjad Alsharafi """ 502*c8f60bfbSAmjad Alsharafi Read the content of the file at the given path. 503*c8f60bfbSAmjad Alsharafi """ 504*c8f60bfbSAmjad Alsharafi if entry is None: 505*c8f60bfbSAmjad Alsharafi return None 506*c8f60bfbSAmjad Alsharafi if entry.attributes & 0x10 != 0: 507*c8f60bfbSAmjad Alsharafi raise Exception(f"{entry.whole_name()} is a directory") 508*c8f60bfbSAmjad Alsharafi 509*c8f60bfbSAmjad Alsharafi data = b"" 510*c8f60bfbSAmjad Alsharafi cluster: Optional[int] = entry.cluster 511*c8f60bfbSAmjad Alsharafi while cluster is not None and len(data) <= entry.size_bytes: 512*c8f60bfbSAmjad Alsharafi data += self.read_cluster(cluster) 513*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 514*c8f60bfbSAmjad Alsharafi return data[: entry.size_bytes] 515*c8f60bfbSAmjad Alsharafi 516*c8f60bfbSAmjad Alsharafi def truncate_file( 517*c8f60bfbSAmjad Alsharafi self, 518*c8f60bfbSAmjad Alsharafi entry: FatDirectoryEntry, 519*c8f60bfbSAmjad Alsharafi new_size: int, 520*c8f60bfbSAmjad Alsharafi allocate_non_continuous: bool = False, 521*c8f60bfbSAmjad Alsharafi ) -> None: 522*c8f60bfbSAmjad Alsharafi """ 523*c8f60bfbSAmjad Alsharafi Truncate the file at the given path to the new size. 524*c8f60bfbSAmjad Alsharafi """ 525*c8f60bfbSAmjad Alsharafi if entry is None: 526*c8f60bfbSAmjad Alsharafi raise Exception("entry is None") 527*c8f60bfbSAmjad Alsharafi if entry.attributes & 0x10 != 0: 528*c8f60bfbSAmjad Alsharafi raise Exception(f"{entry.whole_name()} is a directory") 529*c8f60bfbSAmjad Alsharafi 530*c8f60bfbSAmjad Alsharafi def clusters_from_size(size: int) -> int: 531*c8f60bfbSAmjad Alsharafi return ( 532*c8f60bfbSAmjad Alsharafi size + self.boot_sector.cluster_bytes() - 1 533*c8f60bfbSAmjad Alsharafi ) // self.boot_sector.cluster_bytes() 534*c8f60bfbSAmjad Alsharafi 535*c8f60bfbSAmjad Alsharafi # First, allocate new FATs if we need to 536*c8f60bfbSAmjad Alsharafi required_clusters = clusters_from_size(new_size) 537*c8f60bfbSAmjad Alsharafi current_clusters = clusters_from_size(entry.size_bytes) 538*c8f60bfbSAmjad Alsharafi 539*c8f60bfbSAmjad Alsharafi affected_clusters = set() 540*c8f60bfbSAmjad Alsharafi 541*c8f60bfbSAmjad Alsharafi # Keep at least one cluster, easier to manage this way 542*c8f60bfbSAmjad Alsharafi if required_clusters == 0: 543*c8f60bfbSAmjad Alsharafi required_clusters = 1 544*c8f60bfbSAmjad Alsharafi if current_clusters == 0: 545*c8f60bfbSAmjad Alsharafi current_clusters = 1 546*c8f60bfbSAmjad Alsharafi 547*c8f60bfbSAmjad Alsharafi cluster: Optional[int] 548*c8f60bfbSAmjad Alsharafi 549*c8f60bfbSAmjad Alsharafi if required_clusters > current_clusters: 550*c8f60bfbSAmjad Alsharafi # Allocate new clusters 551*c8f60bfbSAmjad Alsharafi cluster = entry.cluster 552*c8f60bfbSAmjad Alsharafi to_add = required_clusters 553*c8f60bfbSAmjad Alsharafi for _ in range(current_clusters - 1): 554*c8f60bfbSAmjad Alsharafi to_add -= 1 555*c8f60bfbSAmjad Alsharafi assert cluster is not None, "Cluster is None" 556*c8f60bfbSAmjad Alsharafi affected_clusters.add(cluster) 557*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 558*c8f60bfbSAmjad Alsharafi assert required_clusters > 0, "No new clusters to allocate" 559*c8f60bfbSAmjad Alsharafi assert cluster is not None, "Cluster is None" 560*c8f60bfbSAmjad Alsharafi assert ( 561*c8f60bfbSAmjad Alsharafi self.next_cluster(cluster) is None 562*c8f60bfbSAmjad Alsharafi ), "Cluster is not the last cluster" 563*c8f60bfbSAmjad Alsharafi 564*c8f60bfbSAmjad Alsharafi # Allocate new clusters 565*c8f60bfbSAmjad Alsharafi for _ in range(to_add - 1): 566*c8f60bfbSAmjad Alsharafi if allocate_non_continuous: 567*c8f60bfbSAmjad Alsharafi new_cluster = self.next_free_cluster_non_continuous() 568*c8f60bfbSAmjad Alsharafi else: 569*c8f60bfbSAmjad Alsharafi new_cluster = self.next_free_cluster() 570*c8f60bfbSAmjad Alsharafi self.write_fat_entry(cluster, new_cluster) 571*c8f60bfbSAmjad Alsharafi self.write_fat_entry(new_cluster, 0xFFFF) 572*c8f60bfbSAmjad Alsharafi cluster = new_cluster 573*c8f60bfbSAmjad Alsharafi 574*c8f60bfbSAmjad Alsharafi elif required_clusters < current_clusters: 575*c8f60bfbSAmjad Alsharafi # Truncate the file 576*c8f60bfbSAmjad Alsharafi cluster = entry.cluster 577*c8f60bfbSAmjad Alsharafi for _ in range(required_clusters - 1): 578*c8f60bfbSAmjad Alsharafi assert cluster is not None, "Cluster is None" 579*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 580*c8f60bfbSAmjad Alsharafi assert cluster is not None, "Cluster is None" 581*c8f60bfbSAmjad Alsharafi 582*c8f60bfbSAmjad Alsharafi next_cluster = self.next_cluster(cluster) 583*c8f60bfbSAmjad Alsharafi # mark last as EOF 584*c8f60bfbSAmjad Alsharafi self.write_fat_entry(cluster, 0xFFFF) 585*c8f60bfbSAmjad Alsharafi # free the rest 586*c8f60bfbSAmjad Alsharafi while next_cluster is not None: 587*c8f60bfbSAmjad Alsharafi cluster = next_cluster 588*c8f60bfbSAmjad Alsharafi next_cluster = self.next_cluster(next_cluster) 589*c8f60bfbSAmjad Alsharafi self.write_fat_entry(cluster, 0) 590*c8f60bfbSAmjad Alsharafi 591*c8f60bfbSAmjad Alsharafi self.flush_fats() 592*c8f60bfbSAmjad Alsharafi 593*c8f60bfbSAmjad Alsharafi # verify number of clusters 594*c8f60bfbSAmjad Alsharafi cluster = entry.cluster 595*c8f60bfbSAmjad Alsharafi count = 0 596*c8f60bfbSAmjad Alsharafi while cluster is not None: 597*c8f60bfbSAmjad Alsharafi count += 1 598*c8f60bfbSAmjad Alsharafi affected_clusters.add(cluster) 599*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 600*c8f60bfbSAmjad Alsharafi assert ( 601*c8f60bfbSAmjad Alsharafi count == required_clusters 602*c8f60bfbSAmjad Alsharafi ), f"Expected {required_clusters} clusters, got {count}" 603*c8f60bfbSAmjad Alsharafi 604*c8f60bfbSAmjad Alsharafi # update the size 605*c8f60bfbSAmjad Alsharafi entry.size_bytes = new_size 606*c8f60bfbSAmjad Alsharafi self.update_direntry(entry) 607*c8f60bfbSAmjad Alsharafi 608*c8f60bfbSAmjad Alsharafi # trigger every affected cluster 609*c8f60bfbSAmjad Alsharafi for cluster in affected_clusters: 610*c8f60bfbSAmjad Alsharafi first_sector = self.boot_sector.first_sector_of_cluster(cluster) 611*c8f60bfbSAmjad Alsharafi first_sector_data = self.read_sectors(first_sector, 1) 612*c8f60bfbSAmjad Alsharafi self.write_sectors(first_sector, first_sector_data) 613*c8f60bfbSAmjad Alsharafi 614*c8f60bfbSAmjad Alsharafi def write_file(self, entry: FatDirectoryEntry, data: bytes) -> None: 615*c8f60bfbSAmjad Alsharafi """ 616*c8f60bfbSAmjad Alsharafi Write the content of the file at the given path. 617*c8f60bfbSAmjad Alsharafi """ 618*c8f60bfbSAmjad Alsharafi if entry is None: 619*c8f60bfbSAmjad Alsharafi raise Exception("entry is None") 620*c8f60bfbSAmjad Alsharafi if entry.attributes & 0x10 != 0: 621*c8f60bfbSAmjad Alsharafi raise Exception(f"{entry.whole_name()} is a directory") 622*c8f60bfbSAmjad Alsharafi 623*c8f60bfbSAmjad Alsharafi data_len = len(data) 624*c8f60bfbSAmjad Alsharafi 625*c8f60bfbSAmjad Alsharafi self.truncate_file(entry, data_len) 626*c8f60bfbSAmjad Alsharafi 627*c8f60bfbSAmjad Alsharafi cluster: Optional[int] = entry.cluster 628*c8f60bfbSAmjad Alsharafi while cluster is not None: 629*c8f60bfbSAmjad Alsharafi data_to_write = data[: self.boot_sector.cluster_bytes()] 630*c8f60bfbSAmjad Alsharafi if len(data_to_write) < self.boot_sector.cluster_bytes(): 631*c8f60bfbSAmjad Alsharafi old_data = self.read_cluster(cluster) 632*c8f60bfbSAmjad Alsharafi data_to_write += old_data[len(data_to_write) :] 633*c8f60bfbSAmjad Alsharafi 634*c8f60bfbSAmjad Alsharafi self.write_cluster(cluster, data_to_write) 635*c8f60bfbSAmjad Alsharafi data = data[self.boot_sector.cluster_bytes() :] 636*c8f60bfbSAmjad Alsharafi if len(data) == 0: 637*c8f60bfbSAmjad Alsharafi break 638*c8f60bfbSAmjad Alsharafi cluster = self.next_cluster(cluster) 639*c8f60bfbSAmjad Alsharafi 640*c8f60bfbSAmjad Alsharafi assert ( 641*c8f60bfbSAmjad Alsharafi len(data) == 0 642*c8f60bfbSAmjad Alsharafi ), "Data was not written completely, clusters missing" 643*c8f60bfbSAmjad Alsharafi 644*c8f60bfbSAmjad Alsharafi def create_file(self, path: str) -> Optional[FatDirectoryEntry]: 645*c8f60bfbSAmjad Alsharafi """ 646*c8f60bfbSAmjad Alsharafi Create a new file at the given path. 647*c8f60bfbSAmjad Alsharafi """ 648*c8f60bfbSAmjad Alsharafi assert path[0] == "/", "Path must start with /" 649*c8f60bfbSAmjad Alsharafi 650*c8f60bfbSAmjad Alsharafi path = path[1:] # remove the leading / 651*c8f60bfbSAmjad Alsharafi 652*c8f60bfbSAmjad Alsharafi parts = path.split("/") 653*c8f60bfbSAmjad Alsharafi 654*c8f60bfbSAmjad Alsharafi directory_cluster = None 655*c8f60bfbSAmjad Alsharafi directory = self.read_root_directory() 656*c8f60bfbSAmjad Alsharafi 657*c8f60bfbSAmjad Alsharafi parts, filename = parts[:-1], parts[-1] 658*c8f60bfbSAmjad Alsharafi 659*c8f60bfbSAmjad Alsharafi for _, part in enumerate(parts): 660*c8f60bfbSAmjad Alsharafi current_entry = None 661*c8f60bfbSAmjad Alsharafi for entry in directory: 662*c8f60bfbSAmjad Alsharafi if entry.whole_name() == part: 663*c8f60bfbSAmjad Alsharafi current_entry = entry 664*c8f60bfbSAmjad Alsharafi break 665*c8f60bfbSAmjad Alsharafi if current_entry is None: 666*c8f60bfbSAmjad Alsharafi return None 667*c8f60bfbSAmjad Alsharafi 668*c8f60bfbSAmjad Alsharafi if current_entry.attributes & 0x10 == 0: 669*c8f60bfbSAmjad Alsharafi raise Exception( 670*c8f60bfbSAmjad Alsharafi f"{current_entry.whole_name()} is not a directory" 671*c8f60bfbSAmjad Alsharafi ) 672*c8f60bfbSAmjad Alsharafi 673*c8f60bfbSAmjad Alsharafi directory = self.read_directory(current_entry.cluster) 674*c8f60bfbSAmjad Alsharafi directory_cluster = current_entry.cluster 675*c8f60bfbSAmjad Alsharafi 676*c8f60bfbSAmjad Alsharafi # add new entry to the directory 677*c8f60bfbSAmjad Alsharafi 678*c8f60bfbSAmjad Alsharafi filename, ext = filename.split(".") 679*c8f60bfbSAmjad Alsharafi 680*c8f60bfbSAmjad Alsharafi if len(ext) > 3: 681*c8f60bfbSAmjad Alsharafi raise Exception("Ext must be 3 characters or less") 682*c8f60bfbSAmjad Alsharafi if len(filename) > 8: 683*c8f60bfbSAmjad Alsharafi raise Exception("Name must be 8 characters or less") 684*c8f60bfbSAmjad Alsharafi 685*c8f60bfbSAmjad Alsharafi for c in filename + ext: 686*c8f60bfbSAmjad Alsharafi 687*c8f60bfbSAmjad Alsharafi if c not in ALLOWED_FILE_CHARS: 688*c8f60bfbSAmjad Alsharafi raise Exception("Invalid character in filename") 689*c8f60bfbSAmjad Alsharafi 690*c8f60bfbSAmjad Alsharafi return self.add_direntry(directory_cluster, filename, ext, 0) 691