xref: /openbmc/openbmc/meta-arm/meta-arm/lib/fvp/runner.py (revision 96e4b4e121e0e2da1535d7d537d6a982a6ff5bc0)
1import re
2import subprocess
3import os
4import shlex
5import shutil
6import sys
7
8from .terminal import terminals
9from .conffile import load
10
11def cli_from_config(config, terminal_choice):
12    cli = []
13    if config["fvp-bindir"]:
14        cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
15    else:
16        cli.append(config["exe"])
17
18    for param, value in config["parameters"].items():
19        cli.extend(["--parameter", f"{param}={value}"])
20
21    for value in config["data"]:
22        cli.extend(["--data", value])
23
24    for param, value in config["applications"].items():
25        cli.extend(["--application", f"{param}={value}"])
26
27    for terminal, name in config["terminals"].items():
28        # If terminals are enabled and this terminal has been named
29        if terminal_choice != "none" and name:
30            # TODO if raw mode
31            # cli.extend(["--parameter", f"{terminal}.mode=raw"])
32            cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command.format(name=name)}"])
33        else:
34            # Disable terminal
35            cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
36
37    cli.extend(config["args"])
38
39    return cli
40
41def check_telnet():
42    # Check that telnet is present
43    if not bool(shutil.which("telnet")):
44        raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.")
45
46
47class ConsolePortParser:
48    def __init__(self, lines):
49        self._lines = lines
50        self._console_ports = {}
51
52    def parse_port(self, console):
53        if console in self._console_ports:
54            return self._console_ports[console]
55
56        while True:
57            try:
58                line = next(self._lines).strip().decode(errors='ignore')
59                m = re.search(r"(\S+): Listening for serial connection on port (\d+)$", line)
60                if m:
61                    matched_console = m.group(1)
62                    matched_port = int(m.group(2))
63                    if matched_console == console:
64                        return matched_port
65                    else:
66                        self._console_ports[matched_console] = matched_port
67            except StopIteration:
68                # self._lines might be a growing log file
69                pass
70
71
72# This function is backported from Python 3.8. Remove it and replace call sites
73# with shlex.join once OE-core support for earlier Python versions is dropped.
74def shlex_join(split_command):
75    """Return a shell-escaped string from *split_command*."""
76    return ' '.join(shlex.quote(arg) for arg in split_command)
77
78
79class FVPRunner:
80    def __init__(self, logger):
81        self._logger = logger
82        self._fvp_process = None
83        self._telnets = []
84        self._pexpects = []
85        self._config = None
86
87    def start(self, fvpconf, extra_args=[], terminal_choice="none", stdout=subprocess.PIPE):
88        self._logger.debug(f"Loading {fvpconf}")
89        self._config = load(fvpconf)
90
91        cli = cli_from_config(self._config, terminal_choice)
92        cli += extra_args
93
94        # Pass through environment variables needed for GUI applications, such
95        # as xterm, to work.
96        env = self._config['env']
97        for name in ('DISPLAY', 'PATH', 'WAYLAND_DISPLAY', 'XAUTHORITY'):
98            if name in os.environ:
99                env[name] = os.environ[name]
100
101        # Allow filepath to be relative to fvp configuration file
102        cwd = os.path.dirname(fvpconf) or None
103        self._logger.debug(f"FVP call will be executed in working directory: {cwd}")
104
105        self._logger.debug(f"Constructed FVP call: {shlex_join(cli)}")
106        self._fvp_process = subprocess.Popen(
107            cli,
108            stdin=subprocess.DEVNULL, stdout=stdout, stderr=subprocess.STDOUT,
109            env=env,
110            cwd=cwd)
111
112    def stop(self):
113        if self._fvp_process:
114            self._logger.debug(f"Terminating FVP PID {self._fvp_process.pid}")
115            try:
116                self._fvp_process.terminate()
117                self._fvp_process.wait(10.0)
118            except subprocess.TimeoutExpired:
119                self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}")
120                self._fvp_process.kill()
121            except ProcessLookupError:
122                pass
123
124        for telnet in self._telnets:
125            try:
126                telnet.terminate()
127                telnet.wait(10.0)
128            except subprocess.TimeoutExpired:
129                telnet.kill()
130            except ProcessLookupError:
131                pass
132
133        for console in self._pexpects:
134            import pexpect
135            # Ensure pexpect logs all remaining output to the logfile
136            try:
137                console.expect(pexpect.EOF, timeout=30.0)
138            except pexpect.TIMEOUT:
139                pexpect_logfile = ""
140                if console.logfile is not None:
141                    pexpect_logfile = f" ({console.logfile})"
142                self._logger.debug(f"Unable to get EOF on pexpect spawn obj{pexpect_logfile}.")
143            console.close(force=True)
144
145        if self._fvp_process and self._fvp_process.returncode and \
146                self._fvp_process.returncode > 0:
147            # Return codes < 0 indicate that the process was explicitly
148            # terminated above.
149            self._logger.info(f"FVP quit with code {self._fvp_process.returncode}")
150            return self._fvp_process.returncode
151        else:
152            return 0
153
154    def wait(self, timeout):
155        self._fvp_process.wait(timeout)
156
157    def getConfig(self):
158        return self._config
159
160    @property
161    def stdout(self):
162        return self._fvp_process.stdout
163
164    def create_telnet(self, port):
165        check_telnet()
166        telnet = subprocess.Popen(["telnet", "localhost", str(port)], stdin=sys.stdin, stdout=sys.stdout)
167        self._telnets.append(telnet)
168        return telnet
169
170    def create_pexpect(self, port, **kwargs):
171        import pexpect
172        instance = pexpect.spawn(f"telnet localhost {port}", **kwargs)
173        self._pexpects.append(instance)
174        return instance
175
176    def pid(self):
177        return self._fvp_process.pid
178