#!/usr/bin/env python3
# group: rw auto quick
#
# Test case for ejecting BDSs with block jobs still running on them
#
# Originally written in bash by Hanna Czenczek, ported to Python by Stefan
# Hajnoczi.
#
# Copyright Red Hat
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import iotests

# Common filters to mask values that vary in the test output
QMP_FILTERS = [iotests.filter_qmp_testfiles, \
               iotests.filter_qmp_imgfmt]


class TestCase:
    def __init__(self, name, vm, image_path, cancel_event):
        self.name = name
        self.vm = vm
        self.image_path = image_path
        self.cancel_event = cancel_event

    def __enter__(self):
        iotests.log(f'=== Testing {self.name} ===')
        self.vm.qmp_log('blockdev-add', \
                        node_name='drv0', \
                        driver=iotests.imgfmt, \
                        file={'driver': 'file', 'filename': self.image_path}, \
                        filters=QMP_FILTERS)

    def __exit__(self, *exc_details):
        # This is expected to fail because the job still exists
        self.vm.qmp_log('blockdev-del', node_name='drv0', \
                        filters=[iotests.filter_qmp_generated_node_ids])

        self.vm.qmp_log('block-job-cancel', device='job0')
        event = self.vm.event_wait(self.cancel_event)
        iotests.log(event, filters=[iotests.filter_qmp_event])

        # This time it succeeds
        self.vm.qmp_log('blockdev-del', node_name='drv0')

        # Separate test cases in output
        iotests.log('')


def main() -> None:
    with iotests.FilePath('bottom', 'middle', 'top', 'target') as \
            (bottom_path, middle_path, top_path, target_path), \
         iotests.VM() as vm:

        iotests.log('Creating bottom <- middle <- top backing file chain...')
        IMAGE_SIZE='1M'
        iotests.qemu_img_create('-f', iotests.imgfmt, bottom_path, IMAGE_SIZE)
        iotests.qemu_img_create('-f', iotests.imgfmt, \
                                '-F', iotests.imgfmt, \
                                '-b', bottom_path, \
                                middle_path, \
                                IMAGE_SIZE)
        iotests.qemu_img_create('-f', iotests.imgfmt, \
                                '-F', iotests.imgfmt, \
                                '-b', middle_path, \
                                top_path, \
                                IMAGE_SIZE)

        iotests.log('Starting VM...')
        vm.add_args('-nodefaults')
        vm.launch()

        # drive-backup will not send BLOCK_JOB_READY by itself, and cancelling
        # the job will consequently result in BLOCK_JOB_CANCELLED being
        # emitted.
        with TestCase('drive-backup', vm, top_path, 'BLOCK_JOB_CANCELLED'):
            vm.qmp_log('drive-backup', \
                       job_id='job0', \
                       device='drv0', \
                       target=target_path, \
                       format=iotests.imgfmt, \
                       sync='none', \
                       filters=QMP_FILTERS)

        # drive-mirror will send BLOCK_JOB_READY basically immediately, and
        # cancelling the job will consequently result in BLOCK_JOB_COMPLETED
        # being emitted.
        with TestCase('drive-mirror', vm, top_path, 'BLOCK_JOB_COMPLETED'):
            vm.qmp_log('drive-mirror', \
                       job_id='job0', \
                       device='drv0', \
                       target=target_path, \
                       format=iotests.imgfmt, \
                       sync='none', \
                       filters=QMP_FILTERS)
            event = vm.event_wait('BLOCK_JOB_READY')
            assert event is not None # silence mypy
            iotests.log(event, filters=[iotests.filter_qmp_event])

        # An active block-commit will send BLOCK_JOB_READY basically
        # immediately, and cancelling the job will consequently result in
        # BLOCK_JOB_COMPLETED being emitted.
        with TestCase('active block-commit', vm, top_path, \
                      'BLOCK_JOB_COMPLETED'):
            vm.qmp_log('block-commit', \
                       job_id='job0', \
                       device='drv0')
            event = vm.event_wait('BLOCK_JOB_READY')
            assert event is not None # silence mypy
            iotests.log(event, filters=[iotests.filter_qmp_event])

        # Give block-commit something to work on, otherwise it would be done
        # immediately, send a BLOCK_JOB_COMPLETED and ejecting the BDS would
        # work just fine without the block job still running.
        iotests.qemu_io(middle_path, '-c', f'write 0 {IMAGE_SIZE}')
        with TestCase('non-active block-commit', vm, top_path, \
                      'BLOCK_JOB_CANCELLED'):
            vm.qmp_log('block-commit', \
                       job_id='job0', \
                       device='drv0', \
                       top=middle_path, \
                       speed=1, \
                       filters=[iotests.filter_qmp_testfiles])

        # Give block-stream something to work on, otherwise it would be done
        # immediately, send a BLOCK_JOB_COMPLETED and ejecting the BDS would
        # work just fine without the block job still running.
        iotests.qemu_io(bottom_path, '-c', f'write 0 {IMAGE_SIZE}')
        with TestCase('block-stream', vm, top_path, 'BLOCK_JOB_CANCELLED'):
            vm.qmp_log('block-stream', \
                       job_id='job0', \
                       device='drv0', \
                       speed=1)

if __name__ == '__main__':
    iotests.script_main(main, supported_fmts=['qcow2', 'qed'],
                        supported_protocols=['file'])