xref: /openbmc/qemu/tests/functional/qemu_test/ports.py (revision 0f64fb674360393ae09605d8d53bf81c02c78a3e)
1#!/usr/bin/env python3
2#
3# Simple functional tests for VNC functionality
4#
5# Copyright 2018, 2024 Red Hat, Inc.
6#
7# This work is licensed under the terms of the GNU GPL, version 2 or
8# later.  See the COPYING file in the top-level directory.
9
10import fcntl
11import os
12import socket
13
14from .config import BUILD_DIR
15from typing import List
16
17
18class Ports():
19
20    PORTS_ADDR = '127.0.0.1'
21    PORTS_RANGE_SIZE = 1024
22    PORTS_START = 49152 + ((os.getpid() * PORTS_RANGE_SIZE) % 16384)
23    PORTS_END = PORTS_START + PORTS_RANGE_SIZE
24
25    def __enter__(self):
26        lock_file = os.path.join(BUILD_DIR, "tests", "functional", "port_lock")
27        self.lock_fh = os.open(lock_file, os.O_CREAT)
28        fcntl.flock(self.lock_fh, fcntl.LOCK_EX)
29        return self
30
31    def __exit__(self, exc_type, exc_value, traceback):
32        fcntl.flock(self.lock_fh, fcntl.LOCK_UN)
33        os.close(self.lock_fh)
34
35    def check_bind(self, port: int) -> bool:
36        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
37            try:
38                sock.bind((self.PORTS_ADDR, port))
39            except OSError:
40                return False
41
42        return True
43
44    def find_free_ports(self, count: int) -> List[int]:
45        result = []
46        for port in range(self.PORTS_START, self.PORTS_END):
47            if self.check_bind(port):
48                result.append(port)
49                if len(result) >= count:
50                    break
51        assert len(result) == count
52        return result
53
54    def find_free_port(self) -> int:
55        return self.find_free_ports(1)[0]
56