xref: /openbmc/qemu/tests/qemu-iotests/fat16.py (revision 6d00c6f982562222adbd0613966285792125abe5)
1*c8f60bfbSAmjad Alsharafi# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU.
2*c8f60bfbSAmjad Alsharafi#
3*c8f60bfbSAmjad Alsharafi# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
4*c8f60bfbSAmjad Alsharafi#
5*c8f60bfbSAmjad Alsharafi# This program is free software; you can redistribute it and/or modify
6*c8f60bfbSAmjad Alsharafi# it under the terms of the GNU General Public License as published by
7*c8f60bfbSAmjad Alsharafi# the Free Software Foundation; either version 2 of the License, or
8*c8f60bfbSAmjad Alsharafi# (at your option) any later version.
9*c8f60bfbSAmjad Alsharafi#
10*c8f60bfbSAmjad Alsharafi# This program is distributed in the hope that it will be useful,
11*c8f60bfbSAmjad Alsharafi# but WITHOUT ANY WARRANTY; without even the implied warranty of
12*c8f60bfbSAmjad Alsharafi# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13*c8f60bfbSAmjad Alsharafi# GNU General Public License for more details.
14*c8f60bfbSAmjad Alsharafi#
15*c8f60bfbSAmjad Alsharafi# You should have received a copy of the GNU General Public License
16*c8f60bfbSAmjad Alsharafi# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17*c8f60bfbSAmjad Alsharafi
18*c8f60bfbSAmjad Alsharafifrom typing import Callable, List, Optional, Protocol, Set
19*c8f60bfbSAmjad Alsharafiimport string
20*c8f60bfbSAmjad Alsharafi
21*c8f60bfbSAmjad AlsharafiSECTOR_SIZE = 512
22*c8f60bfbSAmjad AlsharafiDIRENTRY_SIZE = 32
23*c8f60bfbSAmjad AlsharafiALLOWED_FILE_CHARS = set(
24*c8f60bfbSAmjad Alsharafi    "!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase
25*c8f60bfbSAmjad Alsharafi)
26*c8f60bfbSAmjad Alsharafi
27*c8f60bfbSAmjad Alsharafi
28*c8f60bfbSAmjad Alsharaficlass MBR:
29*c8f60bfbSAmjad Alsharafi    def __init__(self, data: bytes):
30*c8f60bfbSAmjad Alsharafi        assert len(data) == 512
31*c8f60bfbSAmjad Alsharafi        self.partition_table = []
32*c8f60bfbSAmjad Alsharafi        for i in range(4):
33*c8f60bfbSAmjad Alsharafi            partition = data[446 + i * 16 : 446 + (i + 1) * 16]
34*c8f60bfbSAmjad Alsharafi            self.partition_table.append(
35*c8f60bfbSAmjad Alsharafi                {
36*c8f60bfbSAmjad Alsharafi                    "status": partition[0],
37*c8f60bfbSAmjad Alsharafi                    "start_head": partition[1],
38*c8f60bfbSAmjad Alsharafi                    "start_sector": partition[2] & 0x3F,
39*c8f60bfbSAmjad Alsharafi                    "start_cylinder": ((partition[2] & 0xC0) << 2)
40*c8f60bfbSAmjad Alsharafi                                      | partition[3],
41*c8f60bfbSAmjad Alsharafi                    "type": partition[4],
42*c8f60bfbSAmjad Alsharafi                    "end_head": partition[5],
43*c8f60bfbSAmjad Alsharafi                    "end_sector": partition[6] & 0x3F,
44*c8f60bfbSAmjad Alsharafi                    "end_cylinder": ((partition[6] & 0xC0) << 2)
45*c8f60bfbSAmjad Alsharafi                                    | partition[7],
46*c8f60bfbSAmjad Alsharafi                    "start_lba": int.from_bytes(partition[8:12], "little"),
47*c8f60bfbSAmjad Alsharafi                    "size": int.from_bytes(partition[12:16], "little"),
48*c8f60bfbSAmjad Alsharafi                }
49*c8f60bfbSAmjad Alsharafi            )
50*c8f60bfbSAmjad Alsharafi
51*c8f60bfbSAmjad Alsharafi    def __str__(self):
52*c8f60bfbSAmjad Alsharafi        return "\n".join(
53*c8f60bfbSAmjad Alsharafi            [
54*c8f60bfbSAmjad Alsharafi                f"{i}: {partition}"
55*c8f60bfbSAmjad Alsharafi                for i, partition in enumerate(self.partition_table)
56*c8f60bfbSAmjad Alsharafi            ]
57*c8f60bfbSAmjad Alsharafi        )
58*c8f60bfbSAmjad Alsharafi
59*c8f60bfbSAmjad Alsharafi
60*c8f60bfbSAmjad Alsharaficlass FatBootSector:
61*c8f60bfbSAmjad Alsharafi    # pylint: disable=too-many-instance-attributes
62*c8f60bfbSAmjad Alsharafi    def __init__(self, data: bytes):
63*c8f60bfbSAmjad Alsharafi        assert len(data) == 512
64*c8f60bfbSAmjad Alsharafi        self.bytes_per_sector = int.from_bytes(data[11:13], "little")
65*c8f60bfbSAmjad Alsharafi        self.sectors_per_cluster = data[13]
66*c8f60bfbSAmjad Alsharafi        self.reserved_sectors = int.from_bytes(data[14:16], "little")
67*c8f60bfbSAmjad Alsharafi        self.fat_count = data[16]
68*c8f60bfbSAmjad Alsharafi        self.root_entries = int.from_bytes(data[17:19], "little")
69*c8f60bfbSAmjad Alsharafi        total_sectors_16 = int.from_bytes(data[19:21], "little")
70*c8f60bfbSAmjad Alsharafi        self.media_descriptor = data[21]
71*c8f60bfbSAmjad Alsharafi        self.sectors_per_fat = int.from_bytes(data[22:24], "little")
72*c8f60bfbSAmjad Alsharafi        self.sectors_per_track = int.from_bytes(data[24:26], "little")
73*c8f60bfbSAmjad Alsharafi        self.heads = int.from_bytes(data[26:28], "little")
74*c8f60bfbSAmjad Alsharafi        self.hidden_sectors = int.from_bytes(data[28:32], "little")
75*c8f60bfbSAmjad Alsharafi        total_sectors_32 = int.from_bytes(data[32:36], "little")
76*c8f60bfbSAmjad Alsharafi        assert (
77*c8f60bfbSAmjad Alsharafi            total_sectors_16 == 0 or total_sectors_32 == 0
78*c8f60bfbSAmjad Alsharafi        ), "Both total sectors (16 and 32) fields are non-zero"
79*c8f60bfbSAmjad Alsharafi        self.total_sectors = total_sectors_16 or total_sectors_32
80*c8f60bfbSAmjad Alsharafi        self.drive_number = data[36]
81*c8f60bfbSAmjad Alsharafi        self.volume_id = int.from_bytes(data[39:43], "little")
82*c8f60bfbSAmjad Alsharafi        self.volume_label = data[43:54].decode("ascii").strip()
83*c8f60bfbSAmjad Alsharafi        self.fs_type = data[54:62].decode("ascii").strip()
84*c8f60bfbSAmjad Alsharafi
85*c8f60bfbSAmjad Alsharafi    def root_dir_start(self):
86*c8f60bfbSAmjad Alsharafi        """
87*c8f60bfbSAmjad Alsharafi        Calculate the start sector of the root directory.
88*c8f60bfbSAmjad Alsharafi        """
89*c8f60bfbSAmjad Alsharafi        return self.reserved_sectors + self.fat_count * self.sectors_per_fat
90*c8f60bfbSAmjad Alsharafi
91*c8f60bfbSAmjad Alsharafi    def root_dir_size(self):
92*c8f60bfbSAmjad Alsharafi        """
93*c8f60bfbSAmjad Alsharafi        Calculate the size of the root directory in sectors.
94*c8f60bfbSAmjad Alsharafi        """
95*c8f60bfbSAmjad Alsharafi        return (
96*c8f60bfbSAmjad Alsharafi            self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1
97*c8f60bfbSAmjad Alsharafi        ) // self.bytes_per_sector
98*c8f60bfbSAmjad Alsharafi
99*c8f60bfbSAmjad Alsharafi    def data_sector_start(self):
100*c8f60bfbSAmjad Alsharafi        """
101*c8f60bfbSAmjad Alsharafi        Calculate the start sector of the data region.
102*c8f60bfbSAmjad Alsharafi        """
103*c8f60bfbSAmjad Alsharafi        return self.root_dir_start() + self.root_dir_size()
104*c8f60bfbSAmjad Alsharafi
105*c8f60bfbSAmjad Alsharafi    def first_sector_of_cluster(self, cluster: int) -> int:
106*c8f60bfbSAmjad Alsharafi        """
107*c8f60bfbSAmjad Alsharafi        Calculate the first sector of the given cluster.
108*c8f60bfbSAmjad Alsharafi        """
109*c8f60bfbSAmjad Alsharafi        return (
110*c8f60bfbSAmjad Alsharafi            self.data_sector_start() + (cluster - 2) * self.sectors_per_cluster
111*c8f60bfbSAmjad Alsharafi        )
112*c8f60bfbSAmjad Alsharafi
113*c8f60bfbSAmjad Alsharafi    def cluster_bytes(self):
114*c8f60bfbSAmjad Alsharafi        """
115*c8f60bfbSAmjad Alsharafi        Calculate the number of bytes in a cluster.
116*c8f60bfbSAmjad Alsharafi        """
117*c8f60bfbSAmjad Alsharafi        return self.bytes_per_sector * self.sectors_per_cluster
118*c8f60bfbSAmjad Alsharafi
119*c8f60bfbSAmjad Alsharafi    def __str__(self):
120*c8f60bfbSAmjad Alsharafi        return (
121*c8f60bfbSAmjad Alsharafi            f"Bytes per sector: {self.bytes_per_sector}\n"
122*c8f60bfbSAmjad Alsharafi            f"Sectors per cluster: {self.sectors_per_cluster}\n"
123*c8f60bfbSAmjad Alsharafi            f"Reserved sectors: {self.reserved_sectors}\n"
124*c8f60bfbSAmjad Alsharafi            f"FAT count: {self.fat_count}\n"
125*c8f60bfbSAmjad Alsharafi            f"Root entries: {self.root_entries}\n"
126*c8f60bfbSAmjad Alsharafi            f"Total sectors: {self.total_sectors}\n"
127*c8f60bfbSAmjad Alsharafi            f"Media descriptor: {self.media_descriptor}\n"
128*c8f60bfbSAmjad Alsharafi            f"Sectors per FAT: {self.sectors_per_fat}\n"
129*c8f60bfbSAmjad Alsharafi            f"Sectors per track: {self.sectors_per_track}\n"
130*c8f60bfbSAmjad Alsharafi            f"Heads: {self.heads}\n"
131*c8f60bfbSAmjad Alsharafi            f"Hidden sectors: {self.hidden_sectors}\n"
132*c8f60bfbSAmjad Alsharafi            f"Drive number: {self.drive_number}\n"
133*c8f60bfbSAmjad Alsharafi            f"Volume ID: {self.volume_id}\n"
134*c8f60bfbSAmjad Alsharafi            f"Volume label: {self.volume_label}\n"
135*c8f60bfbSAmjad Alsharafi            f"FS type: {self.fs_type}\n"
136*c8f60bfbSAmjad Alsharafi        )
137*c8f60bfbSAmjad Alsharafi
138*c8f60bfbSAmjad Alsharafi
139*c8f60bfbSAmjad Alsharaficlass FatDirectoryEntry:
140*c8f60bfbSAmjad Alsharafi    # pylint: disable=too-many-instance-attributes
141*c8f60bfbSAmjad Alsharafi    def __init__(self, data: bytes, sector: int, offset: int):
142*c8f60bfbSAmjad Alsharafi        self.name = data[0:8].decode("ascii").strip()
143*c8f60bfbSAmjad Alsharafi        self.ext = data[8:11].decode("ascii").strip()
144*c8f60bfbSAmjad Alsharafi        self.attributes = data[11]
145*c8f60bfbSAmjad Alsharafi        self.reserved = data[12]
146*c8f60bfbSAmjad Alsharafi        self.create_time_tenth = data[13]
147*c8f60bfbSAmjad Alsharafi        self.create_time = int.from_bytes(data[14:16], "little")
148*c8f60bfbSAmjad Alsharafi        self.create_date = int.from_bytes(data[16:18], "little")
149*c8f60bfbSAmjad Alsharafi        self.last_access_date = int.from_bytes(data[18:20], "little")
150*c8f60bfbSAmjad Alsharafi        high_cluster = int.from_bytes(data[20:22], "little")
151*c8f60bfbSAmjad Alsharafi        self.last_mod_time = int.from_bytes(data[22:24], "little")
152*c8f60bfbSAmjad Alsharafi        self.last_mod_date = int.from_bytes(data[24:26], "little")
153*c8f60bfbSAmjad Alsharafi        low_cluster = int.from_bytes(data[26:28], "little")
154*c8f60bfbSAmjad Alsharafi        self.cluster = (high_cluster << 16) | low_cluster
155*c8f60bfbSAmjad Alsharafi        self.size_bytes = int.from_bytes(data[28:32], "little")
156*c8f60bfbSAmjad Alsharafi
157*c8f60bfbSAmjad Alsharafi        # extra (to help write back to disk)
158*c8f60bfbSAmjad Alsharafi        self.sector = sector
159*c8f60bfbSAmjad Alsharafi        self.offset = offset
160*c8f60bfbSAmjad Alsharafi
161*c8f60bfbSAmjad Alsharafi    def as_bytes(self) -> bytes:
162*c8f60bfbSAmjad Alsharafi        return (
163*c8f60bfbSAmjad Alsharafi            self.name.ljust(8, " ").encode("ascii")
164*c8f60bfbSAmjad Alsharafi            + self.ext.ljust(3, " ").encode("ascii")
165*c8f60bfbSAmjad Alsharafi            + self.attributes.to_bytes(1, "little")
166*c8f60bfbSAmjad Alsharafi            + self.reserved.to_bytes(1, "little")
167*c8f60bfbSAmjad Alsharafi            + self.create_time_tenth.to_bytes(1, "little")
168*c8f60bfbSAmjad Alsharafi            + self.create_time.to_bytes(2, "little")
169*c8f60bfbSAmjad Alsharafi            + self.create_date.to_bytes(2, "little")
170*c8f60bfbSAmjad Alsharafi            + self.last_access_date.to_bytes(2, "little")
171*c8f60bfbSAmjad Alsharafi            + (self.cluster >> 16).to_bytes(2, "little")
172*c8f60bfbSAmjad Alsharafi            + self.last_mod_time.to_bytes(2, "little")
173*c8f60bfbSAmjad Alsharafi            + self.last_mod_date.to_bytes(2, "little")
174*c8f60bfbSAmjad Alsharafi            + (self.cluster & 0xFFFF).to_bytes(2, "little")
175*c8f60bfbSAmjad Alsharafi            + self.size_bytes.to_bytes(4, "little")
176*c8f60bfbSAmjad Alsharafi        )
177*c8f60bfbSAmjad Alsharafi
178*c8f60bfbSAmjad Alsharafi    def whole_name(self):
179*c8f60bfbSAmjad Alsharafi        if self.ext:
180*c8f60bfbSAmjad Alsharafi            return f"{self.name}.{self.ext}"
181*c8f60bfbSAmjad Alsharafi        else:
182*c8f60bfbSAmjad Alsharafi            return self.name
183*c8f60bfbSAmjad Alsharafi
184*c8f60bfbSAmjad Alsharafi    def __str__(self):
185*c8f60bfbSAmjad Alsharafi        return (
186*c8f60bfbSAmjad Alsharafi            f"Name: {self.name}\n"
187*c8f60bfbSAmjad Alsharafi            f"Ext: {self.ext}\n"
188*c8f60bfbSAmjad Alsharafi            f"Attributes: {self.attributes}\n"
189*c8f60bfbSAmjad Alsharafi            f"Reserved: {self.reserved}\n"
190*c8f60bfbSAmjad Alsharafi            f"Create time tenth: {self.create_time_tenth}\n"
191*c8f60bfbSAmjad Alsharafi            f"Create time: {self.create_time}\n"
192*c8f60bfbSAmjad Alsharafi            f"Create date: {self.create_date}\n"
193*c8f60bfbSAmjad Alsharafi            f"Last access date: {self.last_access_date}\n"
194*c8f60bfbSAmjad Alsharafi            f"Last mod time: {self.last_mod_time}\n"
195*c8f60bfbSAmjad Alsharafi            f"Last mod date: {self.last_mod_date}\n"
196*c8f60bfbSAmjad Alsharafi            f"Cluster: {self.cluster}\n"
197*c8f60bfbSAmjad Alsharafi            f"Size: {self.size_bytes}\n"
198*c8f60bfbSAmjad Alsharafi        )
199*c8f60bfbSAmjad Alsharafi
200*c8f60bfbSAmjad Alsharafi    def __repr__(self):
201*c8f60bfbSAmjad Alsharafi        # convert to dict
202*c8f60bfbSAmjad Alsharafi        return str(vars(self))
203*c8f60bfbSAmjad Alsharafi
204*c8f60bfbSAmjad Alsharafi
205*c8f60bfbSAmjad Alsharaficlass SectorReader(Protocol):
206*c8f60bfbSAmjad Alsharafi    def __call__(self, start_sector: int, num_sectors: int = 1) -> bytes: ...
207*c8f60bfbSAmjad Alsharafi
208*c8f60bfbSAmjad Alsharafi# pylint: disable=broad-exception-raised
209*c8f60bfbSAmjad Alsharaficlass Fat16:
210*c8f60bfbSAmjad Alsharafi    def __init__(
211*c8f60bfbSAmjad Alsharafi        self,
212*c8f60bfbSAmjad Alsharafi        start_sector: int,
213*c8f60bfbSAmjad Alsharafi        size: int,
214*c8f60bfbSAmjad Alsharafi        sector_reader: SectorReader,
215*c8f60bfbSAmjad Alsharafi        sector_writer: Callable[[int, bytes], None]
216*c8f60bfbSAmjad Alsharafi    ):
217*c8f60bfbSAmjad Alsharafi        self.start_sector = start_sector
218*c8f60bfbSAmjad Alsharafi        self.size_in_sectors = size
219*c8f60bfbSAmjad Alsharafi        self.sector_reader = sector_reader
220*c8f60bfbSAmjad Alsharafi        self.sector_writer = sector_writer
221*c8f60bfbSAmjad Alsharafi
222*c8f60bfbSAmjad Alsharafi        self.boot_sector = FatBootSector(self.sector_reader(start_sector, 1))
223*c8f60bfbSAmjad Alsharafi
224*c8f60bfbSAmjad Alsharafi        fat_size_in_sectors = (
225*c8f60bfbSAmjad Alsharafi            self.boot_sector.sectors_per_fat * self.boot_sector.fat_count
226*c8f60bfbSAmjad Alsharafi        )
227*c8f60bfbSAmjad Alsharafi        self.fats = self.read_sectors(
228*c8f60bfbSAmjad Alsharafi            self.boot_sector.reserved_sectors, fat_size_in_sectors
229*c8f60bfbSAmjad Alsharafi        )
230*c8f60bfbSAmjad Alsharafi        self.fats_dirty_sectors: Set[int] = set()
231*c8f60bfbSAmjad Alsharafi
232*c8f60bfbSAmjad Alsharafi    def read_sectors(self, start_sector: int, num_sectors: int) -> bytes:
233*c8f60bfbSAmjad Alsharafi        return self.sector_reader(start_sector + self.start_sector,
234*c8f60bfbSAmjad Alsharafi                                  num_sectors)
235*c8f60bfbSAmjad Alsharafi
236*c8f60bfbSAmjad Alsharafi    def write_sectors(self, start_sector: int, data: bytes) -> None:
237*c8f60bfbSAmjad Alsharafi        return self.sector_writer(start_sector + self.start_sector, data)
238*c8f60bfbSAmjad Alsharafi
239*c8f60bfbSAmjad Alsharafi    def directory_from_bytes(
240*c8f60bfbSAmjad Alsharafi        self, data: bytes, start_sector: int
241*c8f60bfbSAmjad Alsharafi    ) -> List[FatDirectoryEntry]:
242*c8f60bfbSAmjad Alsharafi        """
243*c8f60bfbSAmjad Alsharafi        Convert `bytes` into a list of `FatDirectoryEntry` objects.
244*c8f60bfbSAmjad Alsharafi        Will ignore long file names.
245*c8f60bfbSAmjad Alsharafi        Will stop when it encounters a 0x00 byte.
246*c8f60bfbSAmjad Alsharafi        """
247*c8f60bfbSAmjad Alsharafi
248*c8f60bfbSAmjad Alsharafi        entries = []
249*c8f60bfbSAmjad Alsharafi        for i in range(0, len(data), DIRENTRY_SIZE):
250*c8f60bfbSAmjad Alsharafi            entry = data[i : i + DIRENTRY_SIZE]
251*c8f60bfbSAmjad Alsharafi
252*c8f60bfbSAmjad Alsharafi            current_sector = start_sector + (i // SECTOR_SIZE)
253*c8f60bfbSAmjad Alsharafi            current_offset = i % SECTOR_SIZE
254*c8f60bfbSAmjad Alsharafi
255*c8f60bfbSAmjad Alsharafi            if entry[0] == 0:
256*c8f60bfbSAmjad Alsharafi                break
257*c8f60bfbSAmjad Alsharafi
258*c8f60bfbSAmjad Alsharafi            if entry[0] == 0xE5:
259*c8f60bfbSAmjad Alsharafi                # Deleted file
260*c8f60bfbSAmjad Alsharafi                continue
261*c8f60bfbSAmjad Alsharafi
262*c8f60bfbSAmjad Alsharafi            if entry[11] & 0xF == 0xF:
263*c8f60bfbSAmjad Alsharafi                # Long file name
264*c8f60bfbSAmjad Alsharafi                continue
265*c8f60bfbSAmjad Alsharafi
266*c8f60bfbSAmjad Alsharafi            entries.append(
267*c8f60bfbSAmjad Alsharafi                FatDirectoryEntry(entry, current_sector, current_offset)
268*c8f60bfbSAmjad Alsharafi            )
269*c8f60bfbSAmjad Alsharafi        return entries
270*c8f60bfbSAmjad Alsharafi
271*c8f60bfbSAmjad Alsharafi    def read_root_directory(self) -> List[FatDirectoryEntry]:
272*c8f60bfbSAmjad Alsharafi        root_dir = self.read_sectors(
273*c8f60bfbSAmjad Alsharafi            self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size()
274*c8f60bfbSAmjad Alsharafi        )
275*c8f60bfbSAmjad Alsharafi        return self.directory_from_bytes(
276*c8f60bfbSAmjad Alsharafi            root_dir, self.boot_sector.root_dir_start()
277*c8f60bfbSAmjad Alsharafi        )
278*c8f60bfbSAmjad Alsharafi
279*c8f60bfbSAmjad Alsharafi    def read_fat_entry(self, cluster: int) -> int:
280*c8f60bfbSAmjad Alsharafi        """
281*c8f60bfbSAmjad Alsharafi        Read the FAT entry for the given cluster.
282*c8f60bfbSAmjad Alsharafi        """
283*c8f60bfbSAmjad Alsharafi        fat_offset = cluster * 2  # FAT16
284*c8f60bfbSAmjad Alsharafi        return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little")
285*c8f60bfbSAmjad Alsharafi
286*c8f60bfbSAmjad Alsharafi    def write_fat_entry(self, cluster: int, value: int) -> None:
287*c8f60bfbSAmjad Alsharafi        """
288*c8f60bfbSAmjad Alsharafi        Write the FAT entry for the given cluster.
289*c8f60bfbSAmjad Alsharafi        """
290*c8f60bfbSAmjad Alsharafi        fat_offset = cluster * 2
291*c8f60bfbSAmjad Alsharafi        self.fats = (
292*c8f60bfbSAmjad Alsharafi            self.fats[:fat_offset]
293*c8f60bfbSAmjad Alsharafi            + value.to_bytes(2, "little")
294*c8f60bfbSAmjad Alsharafi            + self.fats[fat_offset + 2 :]
295*c8f60bfbSAmjad Alsharafi        )
296*c8f60bfbSAmjad Alsharafi        self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
297*c8f60bfbSAmjad Alsharafi
298*c8f60bfbSAmjad Alsharafi    def flush_fats(self) -> None:
299*c8f60bfbSAmjad Alsharafi        """
300*c8f60bfbSAmjad Alsharafi        Write the FATs back to the disk.
301*c8f60bfbSAmjad Alsharafi        """
302*c8f60bfbSAmjad Alsharafi        for sector in self.fats_dirty_sectors:
303*c8f60bfbSAmjad Alsharafi            data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE]
304*c8f60bfbSAmjad Alsharafi            sector = self.boot_sector.reserved_sectors + sector
305*c8f60bfbSAmjad Alsharafi            self.write_sectors(sector, data)
306*c8f60bfbSAmjad Alsharafi        self.fats_dirty_sectors = set()
307*c8f60bfbSAmjad Alsharafi
308*c8f60bfbSAmjad Alsharafi    def next_cluster(self, cluster: int) -> Optional[int]:
309*c8f60bfbSAmjad Alsharafi        """
310*c8f60bfbSAmjad Alsharafi        Get the next cluster in the chain.
311*c8f60bfbSAmjad Alsharafi        If its `None`, then its the last cluster.
312*c8f60bfbSAmjad Alsharafi        The function will crash if the next cluster
313*c8f60bfbSAmjad Alsharafi        is `FREE` (unexpected) or invalid entry.
314*c8f60bfbSAmjad Alsharafi        """
315*c8f60bfbSAmjad Alsharafi        fat_entry = self.read_fat_entry(cluster)
316*c8f60bfbSAmjad Alsharafi        if fat_entry == 0:
317*c8f60bfbSAmjad Alsharafi            raise Exception("Unexpected: FREE cluster")
318*c8f60bfbSAmjad Alsharafi        if fat_entry == 1:
319*c8f60bfbSAmjad Alsharafi            raise Exception("Unexpected: RESERVED cluster")
320*c8f60bfbSAmjad Alsharafi        if fat_entry >= 0xFFF8:
321*c8f60bfbSAmjad Alsharafi            return None
322*c8f60bfbSAmjad Alsharafi        if fat_entry >= 0xFFF7:
323*c8f60bfbSAmjad Alsharafi            raise Exception("Invalid FAT entry")
324*c8f60bfbSAmjad Alsharafi
325*c8f60bfbSAmjad Alsharafi        return fat_entry
326*c8f60bfbSAmjad Alsharafi
327*c8f60bfbSAmjad Alsharafi    def next_free_cluster(self) -> int:
328*c8f60bfbSAmjad Alsharafi        """
329*c8f60bfbSAmjad Alsharafi        Find the next free cluster.
330*c8f60bfbSAmjad Alsharafi        """
331*c8f60bfbSAmjad Alsharafi        # simple linear search
332*c8f60bfbSAmjad Alsharafi        for i in range(2, 0xFFFF):
333*c8f60bfbSAmjad Alsharafi            if self.read_fat_entry(i) == 0:
334*c8f60bfbSAmjad Alsharafi                return i
335*c8f60bfbSAmjad Alsharafi        raise Exception("No free clusters")
336*c8f60bfbSAmjad Alsharafi
337*c8f60bfbSAmjad Alsharafi    def next_free_cluster_non_continuous(self) -> int:
338*c8f60bfbSAmjad Alsharafi        """
339*c8f60bfbSAmjad Alsharafi        Find the next free cluster, but makes sure
340*c8f60bfbSAmjad Alsharafi        that the cluster before and after it are not allocated.
341*c8f60bfbSAmjad Alsharafi        """
342*c8f60bfbSAmjad Alsharafi        # simple linear search
343*c8f60bfbSAmjad Alsharafi        before = False
344*c8f60bfbSAmjad Alsharafi        for i in range(2, 0xFFFF):
345*c8f60bfbSAmjad Alsharafi            if self.read_fat_entry(i) == 0:
346*c8f60bfbSAmjad Alsharafi                if before and self.read_fat_entry(i + 1) == 0:
347*c8f60bfbSAmjad Alsharafi                    return i
348*c8f60bfbSAmjad Alsharafi                else:
349*c8f60bfbSAmjad Alsharafi                    before = True
350*c8f60bfbSAmjad Alsharafi            else:
351*c8f60bfbSAmjad Alsharafi                before = False
352*c8f60bfbSAmjad Alsharafi
353*c8f60bfbSAmjad Alsharafi        raise Exception("No free clusters")
354*c8f60bfbSAmjad Alsharafi
355*c8f60bfbSAmjad Alsharafi    def read_cluster(self, cluster: int) -> bytes:
356*c8f60bfbSAmjad Alsharafi        """
357*c8f60bfbSAmjad Alsharafi        Read the cluster at the given cluster.
358*c8f60bfbSAmjad Alsharafi        """
359*c8f60bfbSAmjad Alsharafi        return self.read_sectors(
360*c8f60bfbSAmjad Alsharafi            self.boot_sector.first_sector_of_cluster(cluster),
361*c8f60bfbSAmjad Alsharafi            self.boot_sector.sectors_per_cluster,
362*c8f60bfbSAmjad Alsharafi        )
363*c8f60bfbSAmjad Alsharafi
364*c8f60bfbSAmjad Alsharafi    def write_cluster(self, cluster: int, data: bytes) -> None:
365*c8f60bfbSAmjad Alsharafi        """
366*c8f60bfbSAmjad Alsharafi        Write the cluster at the given cluster.
367*c8f60bfbSAmjad Alsharafi        """
368*c8f60bfbSAmjad Alsharafi        assert len(data) == self.boot_sector.cluster_bytes()
369*c8f60bfbSAmjad Alsharafi        self.write_sectors(
370*c8f60bfbSAmjad Alsharafi            self.boot_sector.first_sector_of_cluster(cluster),
371*c8f60bfbSAmjad Alsharafi            data,
372*c8f60bfbSAmjad Alsharafi        )
373*c8f60bfbSAmjad Alsharafi
374*c8f60bfbSAmjad Alsharafi    def read_directory(
375*c8f60bfbSAmjad Alsharafi        self, cluster: Optional[int]
376*c8f60bfbSAmjad Alsharafi    ) -> List[FatDirectoryEntry]:
377*c8f60bfbSAmjad Alsharafi        """
378*c8f60bfbSAmjad Alsharafi        Read the directory at the given cluster.
379*c8f60bfbSAmjad Alsharafi        """
380*c8f60bfbSAmjad Alsharafi        entries = []
381*c8f60bfbSAmjad Alsharafi        while cluster is not None:
382*c8f60bfbSAmjad Alsharafi            data = self.read_cluster(cluster)
383*c8f60bfbSAmjad Alsharafi            entries.extend(
384*c8f60bfbSAmjad Alsharafi                self.directory_from_bytes(
385*c8f60bfbSAmjad Alsharafi                    data, self.boot_sector.first_sector_of_cluster(cluster)
386*c8f60bfbSAmjad Alsharafi                )
387*c8f60bfbSAmjad Alsharafi            )
388*c8f60bfbSAmjad Alsharafi            cluster = self.next_cluster(cluster)
389*c8f60bfbSAmjad Alsharafi        return entries
390*c8f60bfbSAmjad Alsharafi
391*c8f60bfbSAmjad Alsharafi    def add_direntry(
392*c8f60bfbSAmjad Alsharafi        self, cluster: Optional[int], name: str, ext: str, attributes: int
393*c8f60bfbSAmjad Alsharafi    ) -> FatDirectoryEntry:
394*c8f60bfbSAmjad Alsharafi        """
395*c8f60bfbSAmjad Alsharafi        Add a new directory entry to the given cluster.
396*c8f60bfbSAmjad Alsharafi        If the cluster is `None`, then it will be added to the root directory.
397*c8f60bfbSAmjad Alsharafi        """
398*c8f60bfbSAmjad Alsharafi
399*c8f60bfbSAmjad Alsharafi        def find_free_entry(data: bytes) -> Optional[int]:
400*c8f60bfbSAmjad Alsharafi            for i in range(0, len(data), DIRENTRY_SIZE):
401*c8f60bfbSAmjad Alsharafi                entry = data[i : i + DIRENTRY_SIZE]
402*c8f60bfbSAmjad Alsharafi                if entry[0] == 0 or entry[0] == 0xE5:
403*c8f60bfbSAmjad Alsharafi                    return i
404*c8f60bfbSAmjad Alsharafi            return None
405*c8f60bfbSAmjad Alsharafi
406*c8f60bfbSAmjad Alsharafi        assert len(name) <= 8, "Name must be 8 characters or less"
407*c8f60bfbSAmjad Alsharafi        assert len(ext) <= 3, "Ext must be 3 characters or less"
408*c8f60bfbSAmjad Alsharafi        assert attributes % 0x15 != 0x15, "Invalid attributes"
409*c8f60bfbSAmjad Alsharafi
410*c8f60bfbSAmjad Alsharafi        # initial dummy data
411*c8f60bfbSAmjad Alsharafi        new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
412*c8f60bfbSAmjad Alsharafi        new_entry.name = name.ljust(8, " ")
413*c8f60bfbSAmjad Alsharafi        new_entry.ext = ext.ljust(3, " ")
414*c8f60bfbSAmjad Alsharafi        new_entry.attributes = attributes
415*c8f60bfbSAmjad Alsharafi        new_entry.reserved = 0
416*c8f60bfbSAmjad Alsharafi        new_entry.create_time_tenth = 0
417*c8f60bfbSAmjad Alsharafi        new_entry.create_time = 0
418*c8f60bfbSAmjad Alsharafi        new_entry.create_date = 0
419*c8f60bfbSAmjad Alsharafi        new_entry.last_access_date = 0
420*c8f60bfbSAmjad Alsharafi        new_entry.last_mod_time = 0
421*c8f60bfbSAmjad Alsharafi        new_entry.last_mod_date = 0
422*c8f60bfbSAmjad Alsharafi        new_entry.cluster = self.next_free_cluster()
423*c8f60bfbSAmjad Alsharafi        new_entry.size_bytes = 0
424*c8f60bfbSAmjad Alsharafi
425*c8f60bfbSAmjad Alsharafi        # mark as EOF
426*c8f60bfbSAmjad Alsharafi        self.write_fat_entry(new_entry.cluster, 0xFFFF)
427*c8f60bfbSAmjad Alsharafi
428*c8f60bfbSAmjad Alsharafi        if cluster is None:
429*c8f60bfbSAmjad Alsharafi            for i in range(self.boot_sector.root_dir_size()):
430*c8f60bfbSAmjad Alsharafi                sector_data = self.read_sectors(
431*c8f60bfbSAmjad Alsharafi                    self.boot_sector.root_dir_start() + i, 1
432*c8f60bfbSAmjad Alsharafi                )
433*c8f60bfbSAmjad Alsharafi                offset = find_free_entry(sector_data)
434*c8f60bfbSAmjad Alsharafi                if offset is not None:
435*c8f60bfbSAmjad Alsharafi                    new_entry.sector = self.boot_sector.root_dir_start() + i
436*c8f60bfbSAmjad Alsharafi                    new_entry.offset = offset
437*c8f60bfbSAmjad Alsharafi                    self.update_direntry(new_entry)
438*c8f60bfbSAmjad Alsharafi                    return new_entry
439*c8f60bfbSAmjad Alsharafi        else:
440*c8f60bfbSAmjad Alsharafi            while cluster is not None:
441*c8f60bfbSAmjad Alsharafi                data = self.read_cluster(cluster)
442*c8f60bfbSAmjad Alsharafi                offset = find_free_entry(data)
443*c8f60bfbSAmjad Alsharafi                if offset is not None:
444*c8f60bfbSAmjad Alsharafi                    new_entry.sector = (
445*c8f60bfbSAmjad Alsharafi                        self.boot_sector.first_sector_of_cluster(cluster)
446*c8f60bfbSAmjad Alsharafi                         + (offset // SECTOR_SIZE))
447*c8f60bfbSAmjad Alsharafi                    new_entry.offset = offset % SECTOR_SIZE
448*c8f60bfbSAmjad Alsharafi                    self.update_direntry(new_entry)
449*c8f60bfbSAmjad Alsharafi                    return new_entry
450*c8f60bfbSAmjad Alsharafi                cluster = self.next_cluster(cluster)
451*c8f60bfbSAmjad Alsharafi
452*c8f60bfbSAmjad Alsharafi        raise Exception("No free directory entries")
453*c8f60bfbSAmjad Alsharafi
454*c8f60bfbSAmjad Alsharafi    def update_direntry(self, entry: FatDirectoryEntry) -> None:
455*c8f60bfbSAmjad Alsharafi        """
456*c8f60bfbSAmjad Alsharafi        Write the directory entry back to the disk.
457*c8f60bfbSAmjad Alsharafi        """
458*c8f60bfbSAmjad Alsharafi        sector = self.read_sectors(entry.sector, 1)
459*c8f60bfbSAmjad Alsharafi        sector = (
460*c8f60bfbSAmjad Alsharafi            sector[: entry.offset]
461*c8f60bfbSAmjad Alsharafi            + entry.as_bytes()
462*c8f60bfbSAmjad Alsharafi            + sector[entry.offset + DIRENTRY_SIZE :]
463*c8f60bfbSAmjad Alsharafi        )
464*c8f60bfbSAmjad Alsharafi        self.write_sectors(entry.sector, sector)
465*c8f60bfbSAmjad Alsharafi
466*c8f60bfbSAmjad Alsharafi    def find_direntry(self, path: str) -> Optional[FatDirectoryEntry]:
467*c8f60bfbSAmjad Alsharafi        """
468*c8f60bfbSAmjad Alsharafi        Find the directory entry for the given path.
469*c8f60bfbSAmjad Alsharafi        """
470*c8f60bfbSAmjad Alsharafi        assert path[0] == "/", "Path must start with /"
471*c8f60bfbSAmjad Alsharafi
472*c8f60bfbSAmjad Alsharafi        path = path[1:]  # remove the leading /
473*c8f60bfbSAmjad Alsharafi        parts = path.split("/")
474*c8f60bfbSAmjad Alsharafi        directory = self.read_root_directory()
475*c8f60bfbSAmjad Alsharafi
476*c8f60bfbSAmjad Alsharafi        current_entry = None
477*c8f60bfbSAmjad Alsharafi
478*c8f60bfbSAmjad Alsharafi        for i, part in enumerate(parts):
479*c8f60bfbSAmjad Alsharafi            is_last = i == len(parts) - 1
480*c8f60bfbSAmjad Alsharafi
481*c8f60bfbSAmjad Alsharafi            for entry in directory:
482*c8f60bfbSAmjad Alsharafi                if entry.whole_name() == part:
483*c8f60bfbSAmjad Alsharafi                    current_entry = entry
484*c8f60bfbSAmjad Alsharafi                    break
485*c8f60bfbSAmjad Alsharafi            if current_entry is None:
486*c8f60bfbSAmjad Alsharafi                return None
487*c8f60bfbSAmjad Alsharafi
488*c8f60bfbSAmjad Alsharafi            if is_last:
489*c8f60bfbSAmjad Alsharafi                return current_entry
490*c8f60bfbSAmjad Alsharafi
491*c8f60bfbSAmjad Alsharafi            if current_entry.attributes & 0x10 == 0:
492*c8f60bfbSAmjad Alsharafi                raise Exception(
493*c8f60bfbSAmjad Alsharafi                    f"{current_entry.whole_name()} is not a directory"
494*c8f60bfbSAmjad Alsharafi                )
495*c8f60bfbSAmjad Alsharafi
496*c8f60bfbSAmjad Alsharafi            directory = self.read_directory(current_entry.cluster)
497*c8f60bfbSAmjad Alsharafi
498*c8f60bfbSAmjad Alsharafi        assert False, "Exited loop with is_last == False"
499*c8f60bfbSAmjad Alsharafi
500*c8f60bfbSAmjad Alsharafi    def read_file(self, entry: Optional[FatDirectoryEntry]) -> Optional[bytes]:
501*c8f60bfbSAmjad Alsharafi        """
502*c8f60bfbSAmjad Alsharafi        Read the content of the file at the given path.
503*c8f60bfbSAmjad Alsharafi        """
504*c8f60bfbSAmjad Alsharafi        if entry is None:
505*c8f60bfbSAmjad Alsharafi            return None
506*c8f60bfbSAmjad Alsharafi        if entry.attributes & 0x10 != 0:
507*c8f60bfbSAmjad Alsharafi            raise Exception(f"{entry.whole_name()} is a directory")
508*c8f60bfbSAmjad Alsharafi
509*c8f60bfbSAmjad Alsharafi        data = b""
510*c8f60bfbSAmjad Alsharafi        cluster: Optional[int] = entry.cluster
511*c8f60bfbSAmjad Alsharafi        while cluster is not None and len(data) <= entry.size_bytes:
512*c8f60bfbSAmjad Alsharafi            data += self.read_cluster(cluster)
513*c8f60bfbSAmjad Alsharafi            cluster = self.next_cluster(cluster)
514*c8f60bfbSAmjad Alsharafi        return data[: entry.size_bytes]
515*c8f60bfbSAmjad Alsharafi
516*c8f60bfbSAmjad Alsharafi    def truncate_file(
517*c8f60bfbSAmjad Alsharafi        self,
518*c8f60bfbSAmjad Alsharafi        entry: FatDirectoryEntry,
519*c8f60bfbSAmjad Alsharafi        new_size: int,
520*c8f60bfbSAmjad Alsharafi        allocate_non_continuous: bool = False,
521*c8f60bfbSAmjad Alsharafi    ) -> None:
522*c8f60bfbSAmjad Alsharafi        """
523*c8f60bfbSAmjad Alsharafi        Truncate the file at the given path to the new size.
524*c8f60bfbSAmjad Alsharafi        """
525*c8f60bfbSAmjad Alsharafi        if entry is None:
526*c8f60bfbSAmjad Alsharafi            raise Exception("entry is None")
527*c8f60bfbSAmjad Alsharafi        if entry.attributes & 0x10 != 0:
528*c8f60bfbSAmjad Alsharafi            raise Exception(f"{entry.whole_name()} is a directory")
529*c8f60bfbSAmjad Alsharafi
530*c8f60bfbSAmjad Alsharafi        def clusters_from_size(size: int) -> int:
531*c8f60bfbSAmjad Alsharafi            return (
532*c8f60bfbSAmjad Alsharafi                size + self.boot_sector.cluster_bytes() - 1
533*c8f60bfbSAmjad Alsharafi            ) // self.boot_sector.cluster_bytes()
534*c8f60bfbSAmjad Alsharafi
535*c8f60bfbSAmjad Alsharafi        # First, allocate new FATs if we need to
536*c8f60bfbSAmjad Alsharafi        required_clusters = clusters_from_size(new_size)
537*c8f60bfbSAmjad Alsharafi        current_clusters = clusters_from_size(entry.size_bytes)
538*c8f60bfbSAmjad Alsharafi
539*c8f60bfbSAmjad Alsharafi        affected_clusters = set()
540*c8f60bfbSAmjad Alsharafi
541*c8f60bfbSAmjad Alsharafi        # Keep at least one cluster, easier to manage this way
542*c8f60bfbSAmjad Alsharafi        if required_clusters == 0:
543*c8f60bfbSAmjad Alsharafi            required_clusters = 1
544*c8f60bfbSAmjad Alsharafi        if current_clusters == 0:
545*c8f60bfbSAmjad Alsharafi            current_clusters = 1
546*c8f60bfbSAmjad Alsharafi
547*c8f60bfbSAmjad Alsharafi        cluster: Optional[int]
548*c8f60bfbSAmjad Alsharafi
549*c8f60bfbSAmjad Alsharafi        if required_clusters > current_clusters:
550*c8f60bfbSAmjad Alsharafi            # Allocate new clusters
551*c8f60bfbSAmjad Alsharafi            cluster = entry.cluster
552*c8f60bfbSAmjad Alsharafi            to_add = required_clusters
553*c8f60bfbSAmjad Alsharafi            for _ in range(current_clusters - 1):
554*c8f60bfbSAmjad Alsharafi                to_add -= 1
555*c8f60bfbSAmjad Alsharafi                assert cluster is not None, "Cluster is None"
556*c8f60bfbSAmjad Alsharafi                affected_clusters.add(cluster)
557*c8f60bfbSAmjad Alsharafi                cluster = self.next_cluster(cluster)
558*c8f60bfbSAmjad Alsharafi            assert required_clusters > 0, "No new clusters to allocate"
559*c8f60bfbSAmjad Alsharafi            assert cluster is not None, "Cluster is None"
560*c8f60bfbSAmjad Alsharafi            assert (
561*c8f60bfbSAmjad Alsharafi                self.next_cluster(cluster) is None
562*c8f60bfbSAmjad Alsharafi            ), "Cluster is not the last cluster"
563*c8f60bfbSAmjad Alsharafi
564*c8f60bfbSAmjad Alsharafi            # Allocate new clusters
565*c8f60bfbSAmjad Alsharafi            for _ in range(to_add - 1):
566*c8f60bfbSAmjad Alsharafi                if allocate_non_continuous:
567*c8f60bfbSAmjad Alsharafi                    new_cluster = self.next_free_cluster_non_continuous()
568*c8f60bfbSAmjad Alsharafi                else:
569*c8f60bfbSAmjad Alsharafi                    new_cluster = self.next_free_cluster()
570*c8f60bfbSAmjad Alsharafi                self.write_fat_entry(cluster, new_cluster)
571*c8f60bfbSAmjad Alsharafi                self.write_fat_entry(new_cluster, 0xFFFF)
572*c8f60bfbSAmjad Alsharafi                cluster = new_cluster
573*c8f60bfbSAmjad Alsharafi
574*c8f60bfbSAmjad Alsharafi        elif required_clusters < current_clusters:
575*c8f60bfbSAmjad Alsharafi            # Truncate the file
576*c8f60bfbSAmjad Alsharafi            cluster = entry.cluster
577*c8f60bfbSAmjad Alsharafi            for _ in range(required_clusters - 1):
578*c8f60bfbSAmjad Alsharafi                assert cluster is not None, "Cluster is None"
579*c8f60bfbSAmjad Alsharafi                cluster = self.next_cluster(cluster)
580*c8f60bfbSAmjad Alsharafi            assert cluster is not None, "Cluster is None"
581*c8f60bfbSAmjad Alsharafi
582*c8f60bfbSAmjad Alsharafi            next_cluster = self.next_cluster(cluster)
583*c8f60bfbSAmjad Alsharafi            # mark last as EOF
584*c8f60bfbSAmjad Alsharafi            self.write_fat_entry(cluster, 0xFFFF)
585*c8f60bfbSAmjad Alsharafi            # free the rest
586*c8f60bfbSAmjad Alsharafi            while next_cluster is not None:
587*c8f60bfbSAmjad Alsharafi                cluster = next_cluster
588*c8f60bfbSAmjad Alsharafi                next_cluster = self.next_cluster(next_cluster)
589*c8f60bfbSAmjad Alsharafi                self.write_fat_entry(cluster, 0)
590*c8f60bfbSAmjad Alsharafi
591*c8f60bfbSAmjad Alsharafi        self.flush_fats()
592*c8f60bfbSAmjad Alsharafi
593*c8f60bfbSAmjad Alsharafi        # verify number of clusters
594*c8f60bfbSAmjad Alsharafi        cluster = entry.cluster
595*c8f60bfbSAmjad Alsharafi        count = 0
596*c8f60bfbSAmjad Alsharafi        while cluster is not None:
597*c8f60bfbSAmjad Alsharafi            count += 1
598*c8f60bfbSAmjad Alsharafi            affected_clusters.add(cluster)
599*c8f60bfbSAmjad Alsharafi            cluster = self.next_cluster(cluster)
600*c8f60bfbSAmjad Alsharafi        assert (
601*c8f60bfbSAmjad Alsharafi            count == required_clusters
602*c8f60bfbSAmjad Alsharafi        ), f"Expected {required_clusters} clusters, got {count}"
603*c8f60bfbSAmjad Alsharafi
604*c8f60bfbSAmjad Alsharafi        # update the size
605*c8f60bfbSAmjad Alsharafi        entry.size_bytes = new_size
606*c8f60bfbSAmjad Alsharafi        self.update_direntry(entry)
607*c8f60bfbSAmjad Alsharafi
608*c8f60bfbSAmjad Alsharafi        # trigger every affected cluster
609*c8f60bfbSAmjad Alsharafi        for cluster in affected_clusters:
610*c8f60bfbSAmjad Alsharafi            first_sector = self.boot_sector.first_sector_of_cluster(cluster)
611*c8f60bfbSAmjad Alsharafi            first_sector_data = self.read_sectors(first_sector, 1)
612*c8f60bfbSAmjad Alsharafi            self.write_sectors(first_sector, first_sector_data)
613*c8f60bfbSAmjad Alsharafi
614*c8f60bfbSAmjad Alsharafi    def write_file(self, entry: FatDirectoryEntry, data: bytes) -> None:
615*c8f60bfbSAmjad Alsharafi        """
616*c8f60bfbSAmjad Alsharafi        Write the content of the file at the given path.
617*c8f60bfbSAmjad Alsharafi        """
618*c8f60bfbSAmjad Alsharafi        if entry is None:
619*c8f60bfbSAmjad Alsharafi            raise Exception("entry is None")
620*c8f60bfbSAmjad Alsharafi        if entry.attributes & 0x10 != 0:
621*c8f60bfbSAmjad Alsharafi            raise Exception(f"{entry.whole_name()} is a directory")
622*c8f60bfbSAmjad Alsharafi
623*c8f60bfbSAmjad Alsharafi        data_len = len(data)
624*c8f60bfbSAmjad Alsharafi
625*c8f60bfbSAmjad Alsharafi        self.truncate_file(entry, data_len)
626*c8f60bfbSAmjad Alsharafi
627*c8f60bfbSAmjad Alsharafi        cluster: Optional[int] = entry.cluster
628*c8f60bfbSAmjad Alsharafi        while cluster is not None:
629*c8f60bfbSAmjad Alsharafi            data_to_write = data[: self.boot_sector.cluster_bytes()]
630*c8f60bfbSAmjad Alsharafi            if len(data_to_write) < self.boot_sector.cluster_bytes():
631*c8f60bfbSAmjad Alsharafi                old_data = self.read_cluster(cluster)
632*c8f60bfbSAmjad Alsharafi                data_to_write += old_data[len(data_to_write) :]
633*c8f60bfbSAmjad Alsharafi
634*c8f60bfbSAmjad Alsharafi            self.write_cluster(cluster, data_to_write)
635*c8f60bfbSAmjad Alsharafi            data = data[self.boot_sector.cluster_bytes() :]
636*c8f60bfbSAmjad Alsharafi            if len(data) == 0:
637*c8f60bfbSAmjad Alsharafi                break
638*c8f60bfbSAmjad Alsharafi            cluster = self.next_cluster(cluster)
639*c8f60bfbSAmjad Alsharafi
640*c8f60bfbSAmjad Alsharafi        assert (
641*c8f60bfbSAmjad Alsharafi            len(data) == 0
642*c8f60bfbSAmjad Alsharafi        ), "Data was not written completely, clusters missing"
643*c8f60bfbSAmjad Alsharafi
644*c8f60bfbSAmjad Alsharafi    def create_file(self, path: str) -> Optional[FatDirectoryEntry]:
645*c8f60bfbSAmjad Alsharafi        """
646*c8f60bfbSAmjad Alsharafi        Create a new file at the given path.
647*c8f60bfbSAmjad Alsharafi        """
648*c8f60bfbSAmjad Alsharafi        assert path[0] == "/", "Path must start with /"
649*c8f60bfbSAmjad Alsharafi
650*c8f60bfbSAmjad Alsharafi        path = path[1:]  # remove the leading /
651*c8f60bfbSAmjad Alsharafi
652*c8f60bfbSAmjad Alsharafi        parts = path.split("/")
653*c8f60bfbSAmjad Alsharafi
654*c8f60bfbSAmjad Alsharafi        directory_cluster = None
655*c8f60bfbSAmjad Alsharafi        directory = self.read_root_directory()
656*c8f60bfbSAmjad Alsharafi
657*c8f60bfbSAmjad Alsharafi        parts, filename = parts[:-1], parts[-1]
658*c8f60bfbSAmjad Alsharafi
659*c8f60bfbSAmjad Alsharafi        for _, part in enumerate(parts):
660*c8f60bfbSAmjad Alsharafi            current_entry = None
661*c8f60bfbSAmjad Alsharafi            for entry in directory:
662*c8f60bfbSAmjad Alsharafi                if entry.whole_name() == part:
663*c8f60bfbSAmjad Alsharafi                    current_entry = entry
664*c8f60bfbSAmjad Alsharafi                    break
665*c8f60bfbSAmjad Alsharafi            if current_entry is None:
666*c8f60bfbSAmjad Alsharafi                return None
667*c8f60bfbSAmjad Alsharafi
668*c8f60bfbSAmjad Alsharafi            if current_entry.attributes & 0x10 == 0:
669*c8f60bfbSAmjad Alsharafi                raise Exception(
670*c8f60bfbSAmjad Alsharafi                    f"{current_entry.whole_name()} is not a directory"
671*c8f60bfbSAmjad Alsharafi                )
672*c8f60bfbSAmjad Alsharafi
673*c8f60bfbSAmjad Alsharafi            directory = self.read_directory(current_entry.cluster)
674*c8f60bfbSAmjad Alsharafi            directory_cluster = current_entry.cluster
675*c8f60bfbSAmjad Alsharafi
676*c8f60bfbSAmjad Alsharafi        # add new entry to the directory
677*c8f60bfbSAmjad Alsharafi
678*c8f60bfbSAmjad Alsharafi        filename, ext = filename.split(".")
679*c8f60bfbSAmjad Alsharafi
680*c8f60bfbSAmjad Alsharafi        if len(ext) > 3:
681*c8f60bfbSAmjad Alsharafi            raise Exception("Ext must be 3 characters or less")
682*c8f60bfbSAmjad Alsharafi        if len(filename) > 8:
683*c8f60bfbSAmjad Alsharafi            raise Exception("Name must be 8 characters or less")
684*c8f60bfbSAmjad Alsharafi
685*c8f60bfbSAmjad Alsharafi        for c in filename + ext:
686*c8f60bfbSAmjad Alsharafi
687*c8f60bfbSAmjad Alsharafi            if c not in ALLOWED_FILE_CHARS:
688*c8f60bfbSAmjad Alsharafi                raise Exception("Invalid character in filename")
689*c8f60bfbSAmjad Alsharafi
690*c8f60bfbSAmjad Alsharafi        return self.add_direntry(directory_cluster, filename, ext, 0)
691