1import re
2import subprocess
3import os
4import shlex
5import shutil
6import sys
7
8from .terminal import terminals
9from .conffile import load
10
11def cli_from_config(config, terminal_choice):
12    cli = []
13    if config["fvp-bindir"]:
14        cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
15    else:
16        cli.append(config["exe"])
17
18    for param, value in config["parameters"].items():
19        cli.extend(["--parameter", f"{param}={value}"])
20
21    for value in config["data"]:
22        cli.extend(["--data", value])
23
24    for param, value in config["applications"].items():
25        cli.extend(["--application", f"{param}={value}"])
26
27    for terminal, name in config["terminals"].items():
28        # If terminals are enabled and this terminal has been named
29        if terminal_choice != "none" and name:
30            # TODO if raw mode
31            # cli.extend(["--parameter", f"{terminal}.mode=raw"])
32            # TODO put name into terminal title
33            cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command}"])
34        else:
35            # Disable terminal
36            cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
37
38    cli.extend(config["args"])
39
40    return cli
41
42def check_telnet():
43    # Check that telnet is present
44    if not bool(shutil.which("telnet")):
45        raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.")
46
47
48class ConsolePortParser:
49    def __init__(self, lines):
50        self._lines = lines
51        self._console_ports = {}
52
53    def parse_port(self, console):
54        if console in self._console_ports:
55            return self._console_ports[console]
56
57        while True:
58            try:
59                line = next(self._lines).strip().decode(errors='ignore')
60                m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
61                if m:
62                    matched_console = m.group(1)
63                    matched_port = int(m.group(2))
64                    if matched_console == console:
65                        return matched_port
66                    else:
67                        self._console_ports[matched_console] = matched_port
68            except StopIteration:
69                # self._lines might be a growing log file
70                pass
71
72
73# This function is backported from Python 3.8. Remove it and replace call sites
74# with shlex.join once OE-core support for earlier Python versions is dropped.
75def shlex_join(split_command):
76    """Return a shell-escaped string from *split_command*."""
77    return ' '.join(shlex.quote(arg) for arg in split_command)
78
79
80class FVPRunner:
81    def __init__(self, logger):
82        self._logger = logger
83        self._fvp_process = None
84        self._telnets = []
85        self._pexpects = []
86        self._config = None
87
88    def start(self, fvpconf, extra_args=[], terminal_choice="none", stdout=subprocess.PIPE):
89        self._logger.debug(f"Loading {fvpconf}")
90        self._config = load(fvpconf)
91
92        cli = cli_from_config(self._config, terminal_choice)
93        cli += extra_args
94
95        # Pass through environment variables needed for GUI applications, such
96        # as xterm, to work.
97        env = self._config['env']
98        for name in ('DISPLAY', 'PATH', 'WAYLAND_DISPLAY', 'XAUTHORITY'):
99            if name in os.environ:
100                env[name] = os.environ[name]
101
102        # Allow filepath to be relative to fvp configuration file
103        cwd = os.path.dirname(fvpconf) or None
104        self._logger.debug(f"FVP call will be executed in working directory: {cwd}")
105
106        self._logger.debug(f"Constructed FVP call: {shlex_join(cli)}")
107        self._fvp_process = subprocess.Popen(
108            cli,
109            stdin=subprocess.DEVNULL, stdout=stdout, stderr=subprocess.STDOUT,
110            env=env,
111            cwd=cwd)
112
113    def stop(self):
114        if self._fvp_process:
115            self._logger.debug(f"Terminating FVP PID {self._fvp_process.pid}")
116            try:
117                self._fvp_process.terminate()
118                self._fvp_process.wait(10.0)
119            except subprocess.TimeoutExpired:
120                self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}")
121                self._fvp_process.kill()
122            except ProcessLookupError:
123                pass
124
125        for telnet in self._telnets:
126            try:
127                telnet.terminate()
128                telnet.wait(10.0)
129            except subprocess.TimeoutExpired:
130                telnet.kill()
131            except ProcessLookupError:
132                pass
133
134        for console in self._pexpects:
135            import pexpect
136            # Ensure pexpect logs all remaining output to the logfile
137            console.expect(pexpect.EOF, timeout=5.0)
138            console.close()
139
140        if self._fvp_process and self._fvp_process.returncode and \
141                self._fvp_process.returncode > 0:
142            # Return codes < 0 indicate that the process was explicitly
143            # terminated above.
144            self._logger.info(f"FVP quit with code {self._fvp_process.returncode}")
145            return self._fvp_process.returncode
146        else:
147            return 0
148
149    def wait(self, timeout):
150        self._fvp_process.wait(timeout)
151
152    def getConfig(self):
153        return self._config
154
155    @property
156    def stdout(self):
157        return self._fvp_process.stdout
158
159    def create_telnet(self, port):
160        check_telnet()
161        telnet = subprocess.Popen(["telnet", "localhost", str(port)], stdin=sys.stdin, stdout=sys.stdout)
162        self._telnets.append(telnet)
163        return telnet
164
165    def create_pexpect(self, port, **kwargs):
166        import pexpect
167        instance = pexpect.spawn(f"telnet localhost {port}", **kwargs)
168        self._pexpects.append(instance)
169        return instance
170
171    def pid(self):
172        return self._fvp_process.pid
173