xref: /openbmc/qemu/tests/functional/qemu_test/ports.py (revision 1ada452efc7d8f8bf42cd5e8a2af1b4ac9167a1f)
1*b7edbbf4SThomas Huth#!/usr/bin/env python3
2*b7edbbf4SThomas Huth#
3*b7edbbf4SThomas Huth# Simple functional tests for VNC functionality
4*b7edbbf4SThomas Huth#
5*b7edbbf4SThomas Huth# Copyright 2018, 2024 Red Hat, Inc.
6*b7edbbf4SThomas Huth#
7*b7edbbf4SThomas Huth# This work is licensed under the terms of the GNU GPL, version 2 or
8*b7edbbf4SThomas Huth# later.  See the COPYING file in the top-level directory.
9*b7edbbf4SThomas Huth
10*b7edbbf4SThomas Huthimport fcntl
11*b7edbbf4SThomas Huthimport os
12*b7edbbf4SThomas Huthimport socket
13*b7edbbf4SThomas Huthimport sys
14*b7edbbf4SThomas Huthimport tempfile
15*b7edbbf4SThomas Huth
16*b7edbbf4SThomas Huthfrom .config import BUILD_DIR
17*b7edbbf4SThomas Huthfrom typing import List
18*b7edbbf4SThomas Huth
19*b7edbbf4SThomas Huthclass Ports():
20*b7edbbf4SThomas Huth
21*b7edbbf4SThomas Huth    PORTS_ADDR = '127.0.0.1'
22*b7edbbf4SThomas Huth    PORTS_RANGE_SIZE = 1024
23*b7edbbf4SThomas Huth    PORTS_START = 49152 + ((os.getpid() * PORTS_RANGE_SIZE) % 16384)
24*b7edbbf4SThomas Huth    PORTS_END = PORTS_START + PORTS_RANGE_SIZE
25*b7edbbf4SThomas Huth
26*b7edbbf4SThomas Huth    def __enter__(self):
27*b7edbbf4SThomas Huth        lock_file = os.path.join(BUILD_DIR, "tests", "functional", "port_lock")
28*b7edbbf4SThomas Huth        self.lock_fh = os.open(lock_file, os.O_CREAT)
29*b7edbbf4SThomas Huth        fcntl.flock(self.lock_fh, fcntl.LOCK_EX)
30*b7edbbf4SThomas Huth        return self
31*b7edbbf4SThomas Huth
32*b7edbbf4SThomas Huth    def __exit__(self, exc_type, exc_value, traceback):
33*b7edbbf4SThomas Huth        fcntl.flock(self.lock_fh, fcntl.LOCK_UN)
34*b7edbbf4SThomas Huth        os.close(self.lock_fh)
35*b7edbbf4SThomas Huth
36*b7edbbf4SThomas Huth    def check_bind(self, port: int) -> bool:
37*b7edbbf4SThomas Huth        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
38*b7edbbf4SThomas Huth            try:
39*b7edbbf4SThomas Huth                sock.bind((self.PORTS_ADDR, port))
40*b7edbbf4SThomas Huth            except OSError:
41*b7edbbf4SThomas Huth                return False
42*b7edbbf4SThomas Huth
43*b7edbbf4SThomas Huth        return True
44*b7edbbf4SThomas Huth
45*b7edbbf4SThomas Huth    def find_free_ports(self, count: int) -> List[int]:
46*b7edbbf4SThomas Huth        result = []
47*b7edbbf4SThomas Huth        for port in range(self.PORTS_START, self.PORTS_END):
48*b7edbbf4SThomas Huth            if self.check_bind(port):
49*b7edbbf4SThomas Huth                result.append(port)
50*b7edbbf4SThomas Huth                if len(result) >= count:
51*b7edbbf4SThomas Huth                    break
52*b7edbbf4SThomas Huth        assert len(result) == count
53*b7edbbf4SThomas Huth        return result
54*b7edbbf4SThomas Huth
55*b7edbbf4SThomas Huth    def find_free_port(self) -> int:
56*b7edbbf4SThomas Huth        return self.find_free_ports(1)[0]
57