xref: /openbmc/qemu/tests/qemu-iotests/fat16.py (revision 2e1cacfb)
1# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU.
2#
3# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18from typing import Callable, List, Optional, Protocol, Set
19import string
20
21SECTOR_SIZE = 512
22DIRENTRY_SIZE = 32
23ALLOWED_FILE_CHARS = set(
24    "!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase
25)
26
27
28class MBR:
29    def __init__(self, data: bytes):
30        assert len(data) == 512
31        self.partition_table = []
32        for i in range(4):
33            partition = data[446 + i * 16 : 446 + (i + 1) * 16]
34            self.partition_table.append(
35                {
36                    "status": partition[0],
37                    "start_head": partition[1],
38                    "start_sector": partition[2] & 0x3F,
39                    "start_cylinder": ((partition[2] & 0xC0) << 2)
40                                      | partition[3],
41                    "type": partition[4],
42                    "end_head": partition[5],
43                    "end_sector": partition[6] & 0x3F,
44                    "end_cylinder": ((partition[6] & 0xC0) << 2)
45                                    | partition[7],
46                    "start_lba": int.from_bytes(partition[8:12], "little"),
47                    "size": int.from_bytes(partition[12:16], "little"),
48                }
49            )
50
51    def __str__(self):
52        return "\n".join(
53            [
54                f"{i}: {partition}"
55                for i, partition in enumerate(self.partition_table)
56            ]
57        )
58
59
60class FatBootSector:
61    # pylint: disable=too-many-instance-attributes
62    def __init__(self, data: bytes):
63        assert len(data) == 512
64        self.bytes_per_sector = int.from_bytes(data[11:13], "little")
65        self.sectors_per_cluster = data[13]
66        self.reserved_sectors = int.from_bytes(data[14:16], "little")
67        self.fat_count = data[16]
68        self.root_entries = int.from_bytes(data[17:19], "little")
69        total_sectors_16 = int.from_bytes(data[19:21], "little")
70        self.media_descriptor = data[21]
71        self.sectors_per_fat = int.from_bytes(data[22:24], "little")
72        self.sectors_per_track = int.from_bytes(data[24:26], "little")
73        self.heads = int.from_bytes(data[26:28], "little")
74        self.hidden_sectors = int.from_bytes(data[28:32], "little")
75        total_sectors_32 = int.from_bytes(data[32:36], "little")
76        assert (
77            total_sectors_16 == 0 or total_sectors_32 == 0
78        ), "Both total sectors (16 and 32) fields are non-zero"
79        self.total_sectors = total_sectors_16 or total_sectors_32
80        self.drive_number = data[36]
81        self.volume_id = int.from_bytes(data[39:43], "little")
82        self.volume_label = data[43:54].decode("ascii").strip()
83        self.fs_type = data[54:62].decode("ascii").strip()
84
85    def root_dir_start(self):
86        """
87        Calculate the start sector of the root directory.
88        """
89        return self.reserved_sectors + self.fat_count * self.sectors_per_fat
90
91    def root_dir_size(self):
92        """
93        Calculate the size of the root directory in sectors.
94        """
95        return (
96            self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1
97        ) // self.bytes_per_sector
98
99    def data_sector_start(self):
100        """
101        Calculate the start sector of the data region.
102        """
103        return self.root_dir_start() + self.root_dir_size()
104
105    def first_sector_of_cluster(self, cluster: int) -> int:
106        """
107        Calculate the first sector of the given cluster.
108        """
109        return (
110            self.data_sector_start() + (cluster - 2) * self.sectors_per_cluster
111        )
112
113    def cluster_bytes(self):
114        """
115        Calculate the number of bytes in a cluster.
116        """
117        return self.bytes_per_sector * self.sectors_per_cluster
118
119    def __str__(self):
120        return (
121            f"Bytes per sector: {self.bytes_per_sector}\n"
122            f"Sectors per cluster: {self.sectors_per_cluster}\n"
123            f"Reserved sectors: {self.reserved_sectors}\n"
124            f"FAT count: {self.fat_count}\n"
125            f"Root entries: {self.root_entries}\n"
126            f"Total sectors: {self.total_sectors}\n"
127            f"Media descriptor: {self.media_descriptor}\n"
128            f"Sectors per FAT: {self.sectors_per_fat}\n"
129            f"Sectors per track: {self.sectors_per_track}\n"
130            f"Heads: {self.heads}\n"
131            f"Hidden sectors: {self.hidden_sectors}\n"
132            f"Drive number: {self.drive_number}\n"
133            f"Volume ID: {self.volume_id}\n"
134            f"Volume label: {self.volume_label}\n"
135            f"FS type: {self.fs_type}\n"
136        )
137
138
139class FatDirectoryEntry:
140    # pylint: disable=too-many-instance-attributes
141    def __init__(self, data: bytes, sector: int, offset: int):
142        self.name = data[0:8].decode("ascii").strip()
143        self.ext = data[8:11].decode("ascii").strip()
144        self.attributes = data[11]
145        self.reserved = data[12]
146        self.create_time_tenth = data[13]
147        self.create_time = int.from_bytes(data[14:16], "little")
148        self.create_date = int.from_bytes(data[16:18], "little")
149        self.last_access_date = int.from_bytes(data[18:20], "little")
150        high_cluster = int.from_bytes(data[20:22], "little")
151        self.last_mod_time = int.from_bytes(data[22:24], "little")
152        self.last_mod_date = int.from_bytes(data[24:26], "little")
153        low_cluster = int.from_bytes(data[26:28], "little")
154        self.cluster = (high_cluster << 16) | low_cluster
155        self.size_bytes = int.from_bytes(data[28:32], "little")
156
157        # extra (to help write back to disk)
158        self.sector = sector
159        self.offset = offset
160
161    def as_bytes(self) -> bytes:
162        return (
163            self.name.ljust(8, " ").encode("ascii")
164            + self.ext.ljust(3, " ").encode("ascii")
165            + self.attributes.to_bytes(1, "little")
166            + self.reserved.to_bytes(1, "little")
167            + self.create_time_tenth.to_bytes(1, "little")
168            + self.create_time.to_bytes(2, "little")
169            + self.create_date.to_bytes(2, "little")
170            + self.last_access_date.to_bytes(2, "little")
171            + (self.cluster >> 16).to_bytes(2, "little")
172            + self.last_mod_time.to_bytes(2, "little")
173            + self.last_mod_date.to_bytes(2, "little")
174            + (self.cluster & 0xFFFF).to_bytes(2, "little")
175            + self.size_bytes.to_bytes(4, "little")
176        )
177
178    def whole_name(self):
179        if self.ext:
180            return f"{self.name}.{self.ext}"
181        else:
182            return self.name
183
184    def __str__(self):
185        return (
186            f"Name: {self.name}\n"
187            f"Ext: {self.ext}\n"
188            f"Attributes: {self.attributes}\n"
189            f"Reserved: {self.reserved}\n"
190            f"Create time tenth: {self.create_time_tenth}\n"
191            f"Create time: {self.create_time}\n"
192            f"Create date: {self.create_date}\n"
193            f"Last access date: {self.last_access_date}\n"
194            f"Last mod time: {self.last_mod_time}\n"
195            f"Last mod date: {self.last_mod_date}\n"
196            f"Cluster: {self.cluster}\n"
197            f"Size: {self.size_bytes}\n"
198        )
199
200    def __repr__(self):
201        # convert to dict
202        return str(vars(self))
203
204
205class SectorReader(Protocol):
206    def __call__(self, start_sector: int, num_sectors: int = 1) -> bytes: ...
207
208# pylint: disable=broad-exception-raised
209class Fat16:
210    def __init__(
211        self,
212        start_sector: int,
213        size: int,
214        sector_reader: SectorReader,
215        sector_writer: Callable[[int, bytes], None]
216    ):
217        self.start_sector = start_sector
218        self.size_in_sectors = size
219        self.sector_reader = sector_reader
220        self.sector_writer = sector_writer
221
222        self.boot_sector = FatBootSector(self.sector_reader(start_sector, 1))
223
224        fat_size_in_sectors = (
225            self.boot_sector.sectors_per_fat * self.boot_sector.fat_count
226        )
227        self.fats = self.read_sectors(
228            self.boot_sector.reserved_sectors, fat_size_in_sectors
229        )
230        self.fats_dirty_sectors: Set[int] = set()
231
232    def read_sectors(self, start_sector: int, num_sectors: int) -> bytes:
233        return self.sector_reader(start_sector + self.start_sector,
234                                  num_sectors)
235
236    def write_sectors(self, start_sector: int, data: bytes) -> None:
237        return self.sector_writer(start_sector + self.start_sector, data)
238
239    def directory_from_bytes(
240        self, data: bytes, start_sector: int
241    ) -> List[FatDirectoryEntry]:
242        """
243        Convert `bytes` into a list of `FatDirectoryEntry` objects.
244        Will ignore long file names.
245        Will stop when it encounters a 0x00 byte.
246        """
247
248        entries = []
249        for i in range(0, len(data), DIRENTRY_SIZE):
250            entry = data[i : i + DIRENTRY_SIZE]
251
252            current_sector = start_sector + (i // SECTOR_SIZE)
253            current_offset = i % SECTOR_SIZE
254
255            if entry[0] == 0:
256                break
257
258            if entry[0] == 0xE5:
259                # Deleted file
260                continue
261
262            if entry[11] & 0xF == 0xF:
263                # Long file name
264                continue
265
266            entries.append(
267                FatDirectoryEntry(entry, current_sector, current_offset)
268            )
269        return entries
270
271    def read_root_directory(self) -> List[FatDirectoryEntry]:
272        root_dir = self.read_sectors(
273            self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size()
274        )
275        return self.directory_from_bytes(
276            root_dir, self.boot_sector.root_dir_start()
277        )
278
279    def read_fat_entry(self, cluster: int) -> int:
280        """
281        Read the FAT entry for the given cluster.
282        """
283        fat_offset = cluster * 2  # FAT16
284        return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little")
285
286    def write_fat_entry(self, cluster: int, value: int) -> None:
287        """
288        Write the FAT entry for the given cluster.
289        """
290        fat_offset = cluster * 2
291        self.fats = (
292            self.fats[:fat_offset]
293            + value.to_bytes(2, "little")
294            + self.fats[fat_offset + 2 :]
295        )
296        self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
297
298    def flush_fats(self) -> None:
299        """
300        Write the FATs back to the disk.
301        """
302        for sector in self.fats_dirty_sectors:
303            data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE]
304            sector = self.boot_sector.reserved_sectors + sector
305            self.write_sectors(sector, data)
306        self.fats_dirty_sectors = set()
307
308    def next_cluster(self, cluster: int) -> Optional[int]:
309        """
310        Get the next cluster in the chain.
311        If its `None`, then its the last cluster.
312        The function will crash if the next cluster
313        is `FREE` (unexpected) or invalid entry.
314        """
315        fat_entry = self.read_fat_entry(cluster)
316        if fat_entry == 0:
317            raise Exception("Unexpected: FREE cluster")
318        if fat_entry == 1:
319            raise Exception("Unexpected: RESERVED cluster")
320        if fat_entry >= 0xFFF8:
321            return None
322        if fat_entry >= 0xFFF7:
323            raise Exception("Invalid FAT entry")
324
325        return fat_entry
326
327    def next_free_cluster(self) -> int:
328        """
329        Find the next free cluster.
330        """
331        # simple linear search
332        for i in range(2, 0xFFFF):
333            if self.read_fat_entry(i) == 0:
334                return i
335        raise Exception("No free clusters")
336
337    def next_free_cluster_non_continuous(self) -> int:
338        """
339        Find the next free cluster, but makes sure
340        that the cluster before and after it are not allocated.
341        """
342        # simple linear search
343        before = False
344        for i in range(2, 0xFFFF):
345            if self.read_fat_entry(i) == 0:
346                if before and self.read_fat_entry(i + 1) == 0:
347                    return i
348                else:
349                    before = True
350            else:
351                before = False
352
353        raise Exception("No free clusters")
354
355    def read_cluster(self, cluster: int) -> bytes:
356        """
357        Read the cluster at the given cluster.
358        """
359        return self.read_sectors(
360            self.boot_sector.first_sector_of_cluster(cluster),
361            self.boot_sector.sectors_per_cluster,
362        )
363
364    def write_cluster(self, cluster: int, data: bytes) -> None:
365        """
366        Write the cluster at the given cluster.
367        """
368        assert len(data) == self.boot_sector.cluster_bytes()
369        self.write_sectors(
370            self.boot_sector.first_sector_of_cluster(cluster),
371            data,
372        )
373
374    def read_directory(
375        self, cluster: Optional[int]
376    ) -> List[FatDirectoryEntry]:
377        """
378        Read the directory at the given cluster.
379        """
380        entries = []
381        while cluster is not None:
382            data = self.read_cluster(cluster)
383            entries.extend(
384                self.directory_from_bytes(
385                    data, self.boot_sector.first_sector_of_cluster(cluster)
386                )
387            )
388            cluster = self.next_cluster(cluster)
389        return entries
390
391    def add_direntry(
392        self, cluster: Optional[int], name: str, ext: str, attributes: int
393    ) -> FatDirectoryEntry:
394        """
395        Add a new directory entry to the given cluster.
396        If the cluster is `None`, then it will be added to the root directory.
397        """
398
399        def find_free_entry(data: bytes) -> Optional[int]:
400            for i in range(0, len(data), DIRENTRY_SIZE):
401                entry = data[i : i + DIRENTRY_SIZE]
402                if entry[0] == 0 or entry[0] == 0xE5:
403                    return i
404            return None
405
406        assert len(name) <= 8, "Name must be 8 characters or less"
407        assert len(ext) <= 3, "Ext must be 3 characters or less"
408        assert attributes % 0x15 != 0x15, "Invalid attributes"
409
410        # initial dummy data
411        new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
412        new_entry.name = name.ljust(8, " ")
413        new_entry.ext = ext.ljust(3, " ")
414        new_entry.attributes = attributes
415        new_entry.reserved = 0
416        new_entry.create_time_tenth = 0
417        new_entry.create_time = 0
418        new_entry.create_date = 0
419        new_entry.last_access_date = 0
420        new_entry.last_mod_time = 0
421        new_entry.last_mod_date = 0
422        new_entry.cluster = self.next_free_cluster()
423        new_entry.size_bytes = 0
424
425        # mark as EOF
426        self.write_fat_entry(new_entry.cluster, 0xFFFF)
427
428        if cluster is None:
429            for i in range(self.boot_sector.root_dir_size()):
430                sector_data = self.read_sectors(
431                    self.boot_sector.root_dir_start() + i, 1
432                )
433                offset = find_free_entry(sector_data)
434                if offset is not None:
435                    new_entry.sector = self.boot_sector.root_dir_start() + i
436                    new_entry.offset = offset
437                    self.update_direntry(new_entry)
438                    return new_entry
439        else:
440            while cluster is not None:
441                data = self.read_cluster(cluster)
442                offset = find_free_entry(data)
443                if offset is not None:
444                    new_entry.sector = (
445                        self.boot_sector.first_sector_of_cluster(cluster)
446                         + (offset // SECTOR_SIZE))
447                    new_entry.offset = offset % SECTOR_SIZE
448                    self.update_direntry(new_entry)
449                    return new_entry
450                cluster = self.next_cluster(cluster)
451
452        raise Exception("No free directory entries")
453
454    def update_direntry(self, entry: FatDirectoryEntry) -> None:
455        """
456        Write the directory entry back to the disk.
457        """
458        sector = self.read_sectors(entry.sector, 1)
459        sector = (
460            sector[: entry.offset]
461            + entry.as_bytes()
462            + sector[entry.offset + DIRENTRY_SIZE :]
463        )
464        self.write_sectors(entry.sector, sector)
465
466    def find_direntry(self, path: str) -> Optional[FatDirectoryEntry]:
467        """
468        Find the directory entry for the given path.
469        """
470        assert path[0] == "/", "Path must start with /"
471
472        path = path[1:]  # remove the leading /
473        parts = path.split("/")
474        directory = self.read_root_directory()
475
476        current_entry = None
477
478        for i, part in enumerate(parts):
479            is_last = i == len(parts) - 1
480
481            for entry in directory:
482                if entry.whole_name() == part:
483                    current_entry = entry
484                    break
485            if current_entry is None:
486                return None
487
488            if is_last:
489                return current_entry
490
491            if current_entry.attributes & 0x10 == 0:
492                raise Exception(
493                    f"{current_entry.whole_name()} is not a directory"
494                )
495
496            directory = self.read_directory(current_entry.cluster)
497
498        assert False, "Exited loop with is_last == False"
499
500    def read_file(self, entry: Optional[FatDirectoryEntry]) -> Optional[bytes]:
501        """
502        Read the content of the file at the given path.
503        """
504        if entry is None:
505            return None
506        if entry.attributes & 0x10 != 0:
507            raise Exception(f"{entry.whole_name()} is a directory")
508
509        data = b""
510        cluster: Optional[int] = entry.cluster
511        while cluster is not None and len(data) <= entry.size_bytes:
512            data += self.read_cluster(cluster)
513            cluster = self.next_cluster(cluster)
514        return data[: entry.size_bytes]
515
516    def truncate_file(
517        self,
518        entry: FatDirectoryEntry,
519        new_size: int,
520        allocate_non_continuous: bool = False,
521    ) -> None:
522        """
523        Truncate the file at the given path to the new size.
524        """
525        if entry is None:
526            raise Exception("entry is None")
527        if entry.attributes & 0x10 != 0:
528            raise Exception(f"{entry.whole_name()} is a directory")
529
530        def clusters_from_size(size: int) -> int:
531            return (
532                size + self.boot_sector.cluster_bytes() - 1
533            ) // self.boot_sector.cluster_bytes()
534
535        # First, allocate new FATs if we need to
536        required_clusters = clusters_from_size(new_size)
537        current_clusters = clusters_from_size(entry.size_bytes)
538
539        affected_clusters = set()
540
541        # Keep at least one cluster, easier to manage this way
542        if required_clusters == 0:
543            required_clusters = 1
544        if current_clusters == 0:
545            current_clusters = 1
546
547        cluster: Optional[int]
548
549        if required_clusters > current_clusters:
550            # Allocate new clusters
551            cluster = entry.cluster
552            to_add = required_clusters
553            for _ in range(current_clusters - 1):
554                to_add -= 1
555                assert cluster is not None, "Cluster is None"
556                affected_clusters.add(cluster)
557                cluster = self.next_cluster(cluster)
558            assert required_clusters > 0, "No new clusters to allocate"
559            assert cluster is not None, "Cluster is None"
560            assert (
561                self.next_cluster(cluster) is None
562            ), "Cluster is not the last cluster"
563
564            # Allocate new clusters
565            for _ in range(to_add - 1):
566                if allocate_non_continuous:
567                    new_cluster = self.next_free_cluster_non_continuous()
568                else:
569                    new_cluster = self.next_free_cluster()
570                self.write_fat_entry(cluster, new_cluster)
571                self.write_fat_entry(new_cluster, 0xFFFF)
572                cluster = new_cluster
573
574        elif required_clusters < current_clusters:
575            # Truncate the file
576            cluster = entry.cluster
577            for _ in range(required_clusters - 1):
578                assert cluster is not None, "Cluster is None"
579                cluster = self.next_cluster(cluster)
580            assert cluster is not None, "Cluster is None"
581
582            next_cluster = self.next_cluster(cluster)
583            # mark last as EOF
584            self.write_fat_entry(cluster, 0xFFFF)
585            # free the rest
586            while next_cluster is not None:
587                cluster = next_cluster
588                next_cluster = self.next_cluster(next_cluster)
589                self.write_fat_entry(cluster, 0)
590
591        self.flush_fats()
592
593        # verify number of clusters
594        cluster = entry.cluster
595        count = 0
596        while cluster is not None:
597            count += 1
598            affected_clusters.add(cluster)
599            cluster = self.next_cluster(cluster)
600        assert (
601            count == required_clusters
602        ), f"Expected {required_clusters} clusters, got {count}"
603
604        # update the size
605        entry.size_bytes = new_size
606        self.update_direntry(entry)
607
608        # trigger every affected cluster
609        for cluster in affected_clusters:
610            first_sector = self.boot_sector.first_sector_of_cluster(cluster)
611            first_sector_data = self.read_sectors(first_sector, 1)
612            self.write_sectors(first_sector, first_sector_data)
613
614    def write_file(self, entry: FatDirectoryEntry, data: bytes) -> None:
615        """
616        Write the content of the file at the given path.
617        """
618        if entry is None:
619            raise Exception("entry is None")
620        if entry.attributes & 0x10 != 0:
621            raise Exception(f"{entry.whole_name()} is a directory")
622
623        data_len = len(data)
624
625        self.truncate_file(entry, data_len)
626
627        cluster: Optional[int] = entry.cluster
628        while cluster is not None:
629            data_to_write = data[: self.boot_sector.cluster_bytes()]
630            if len(data_to_write) < self.boot_sector.cluster_bytes():
631                old_data = self.read_cluster(cluster)
632                data_to_write += old_data[len(data_to_write) :]
633
634            self.write_cluster(cluster, data_to_write)
635            data = data[self.boot_sector.cluster_bytes() :]
636            if len(data) == 0:
637                break
638            cluster = self.next_cluster(cluster)
639
640        assert (
641            len(data) == 0
642        ), "Data was not written completely, clusters missing"
643
644    def create_file(self, path: str) -> Optional[FatDirectoryEntry]:
645        """
646        Create a new file at the given path.
647        """
648        assert path[0] == "/", "Path must start with /"
649
650        path = path[1:]  # remove the leading /
651
652        parts = path.split("/")
653
654        directory_cluster = None
655        directory = self.read_root_directory()
656
657        parts, filename = parts[:-1], parts[-1]
658
659        for _, part in enumerate(parts):
660            current_entry = None
661            for entry in directory:
662                if entry.whole_name() == part:
663                    current_entry = entry
664                    break
665            if current_entry is None:
666                return None
667
668            if current_entry.attributes & 0x10 == 0:
669                raise Exception(
670                    f"{current_entry.whole_name()} is not a directory"
671                )
672
673            directory = self.read_directory(current_entry.cluster)
674            directory_cluster = current_entry.cluster
675
676        # add new entry to the directory
677
678        filename, ext = filename.split(".")
679
680        if len(ext) > 3:
681            raise Exception("Ext must be 3 characters or less")
682        if len(filename) > 8:
683            raise Exception("Name must be 8 characters or less")
684
685        for c in filename + ext:
686
687            if c not in ALLOWED_FILE_CHARS:
688                raise Exception("Invalid character in filename")
689
690        return self.add_direntry(directory_cluster, filename, ext, 0)
691