#!/usr/bin/env python3
#
# Test for preallocate filter
#
# Copyright (c) 2020 Virtuozzo International GmbH.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import iotests

MiB = 1024 * 1024
disk = os.path.join(iotests.test_dir, 'disk')
overlay = os.path.join(iotests.test_dir, 'overlay')
refdisk = os.path.join(iotests.test_dir, 'refdisk')
drive_opts = f'node-name=disk,driver={iotests.imgfmt},' \
    f'file.node-name=filter,file.driver=preallocate,' \
    f'file.file.node-name=file,file.file.filename={disk}'


class TestPreallocateBase(iotests.QMPTestCase):
    def setUp(self):
        iotests.qemu_img_create('-f', iotests.imgfmt, disk, str(10 * MiB))

    def tearDown(self):
        try:
            self.check_small()
            check = iotests.qemu_img_check(disk)
            self.assertFalse('leaks' in check)
            self.assertFalse('corruptions' in check)
            self.assertEqual(check['check-errors'], 0)
        finally:
            os.remove(disk)

    def check_big(self):
        self.assertTrue(os.path.getsize(disk) > 100 * MiB)

    def check_small(self):
        self.assertTrue(os.path.getsize(disk) < 10 * MiB)


class TestQemuImg(TestPreallocateBase):
    def test_qemu_img(self):
        p = iotests.QemuIoInteractive('--image-opts', drive_opts)

        p.cmd('write 0 1M')
        p.cmd('flush')

        self.check_big()

        p.close()


class TestPreallocateFilter(TestPreallocateBase):
    def setUp(self):
        super().setUp()
        self.vm = iotests.VM().add_drive(path=None, opts=drive_opts)
        self.vm.launch()

    def tearDown(self):
        self.vm.shutdown()
        super().tearDown()

    def test_prealloc(self):
        self.vm.hmp_qemu_io('drive0', 'write 0 1M')
        self.check_big()

    def test_external_snapshot(self):
        self.test_prealloc()

        self.vm.cmd('blockdev-snapshot-sync', node_name='disk',
                    snapshot_file=overlay,
                    snapshot_node_name='overlay')

        # on reopen to  r-o base preallocation should be dropped
        self.check_small()

        self.vm.hmp_qemu_io('drive0', 'write 1M 1M')

        self.vm.cmd('block-commit', device='overlay')
        self.complete_and_wait()

        # commit of new megabyte should trigger preallocation
        self.check_big()

    def test_reopen_opts(self):
        self.vm.cmd('blockdev-reopen', options=[{
            'node-name': 'disk',
            'driver': iotests.imgfmt,
            'file': {
                'node-name': 'filter',
                'driver': 'preallocate',
                'prealloc-size': 20 * MiB,
                'prealloc-align': 5 * MiB,
                'file': {
                    'node-name': 'file',
                    'driver': 'file',
                    'filename': disk
                }
            }
        }])

        self.vm.hmp_qemu_io('drive0', 'write 0 1M')
        self.assertTrue(os.path.getsize(disk) == 25 * MiB)


class TestTruncate(iotests.QMPTestCase):
    def setUp(self):
        iotests.qemu_img_create('-f', iotests.imgfmt, disk, str(10 * MiB))
        iotests.qemu_img_create('-f', iotests.imgfmt, refdisk, str(10 * MiB))

    def tearDown(self):
        os.remove(disk)
        os.remove(refdisk)

    def do_test(self, prealloc_mode, new_size):
        iotests.qemu_io('--image-opts', '-c', 'write 0 10M', '-c',
                        f'truncate -m {prealloc_mode} {new_size}',
                        drive_opts)

        iotests.qemu_io('-f', iotests.imgfmt, '-c', 'write 0 10M',
                        '-c', f'truncate -m {prealloc_mode} {new_size}',
                        refdisk)

        stat = os.stat(disk)
        refstat = os.stat(refdisk)

        # The preallocate filter may keep cluster alignment when shrinking,
        # so ignore small differences
        self.assertLess(abs(stat.st_size - refstat.st_size), 64 * 1024)

        # Preallocate filter may leak some internal clusters (for example, if
        # guest write far over EOF, skipping some clusters - they will remain
        # fallocated, preallocate filter don't care about such leaks, it drops
        # only trailing preallocation.
        self.assertLess(abs(stat.st_blocks - refstat.st_blocks) * 512,
                        1024 * 1024)

    def test_real_shrink(self):
        self.do_test('off', '5M')

    def test_truncate_inside_preallocated_area__falloc(self):
        self.do_test('falloc', '50M')

    def test_truncate_inside_preallocated_area__metadata(self):
        self.do_test('metadata', '50M')

    def test_truncate_inside_preallocated_area__full(self):
        self.do_test('full', '50M')

    def test_truncate_inside_preallocated_area__off(self):
        self.do_test('off', '50M')

    def test_truncate_over_preallocated_area__falloc(self):
        self.do_test('falloc', '150M')

    def test_truncate_over_preallocated_area__metadata(self):
        self.do_test('metadata', '150M')

    def test_truncate_over_preallocated_area__full(self):
        self.do_test('full', '150M')

    def test_truncate_over_preallocated_area__off(self):
        self.do_test('off', '150M')


if __name__ == '__main__':
    iotests.main(supported_fmts=['qcow2'], required_fmts=['preallocate'])