1import re 2import subprocess 3import os 4import shlex 5import shutil 6import sys 7 8from .terminal import terminals 9from .conffile import load 10 11def cli_from_config(config, terminal_choice): 12 cli = [] 13 if config["fvp-bindir"]: 14 cli.append(os.path.join(config["fvp-bindir"], config["exe"])) 15 else: 16 cli.append(config["exe"]) 17 18 for param, value in config["parameters"].items(): 19 cli.extend(["--parameter", f"{param}={value}"]) 20 21 for value in config["data"]: 22 cli.extend(["--data", value]) 23 24 for param, value in config["applications"].items(): 25 cli.extend(["--application", f"{param}={value}"]) 26 27 for terminal, name in config["terminals"].items(): 28 # If terminals are enabled and this terminal has been named 29 if terminal_choice != "none" and name: 30 # TODO if raw mode 31 # cli.extend(["--parameter", f"{terminal}.mode=raw"]) 32 # TODO put name into terminal title 33 cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command}"]) 34 else: 35 # Disable terminal 36 cli.extend(["--parameter", f"{terminal}.start_telnet=0"]) 37 38 cli.extend(config["args"]) 39 40 return cli 41 42def check_telnet(): 43 # Check that telnet is present 44 if not bool(shutil.which("telnet")): 45 raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.") 46 47 48class ConsolePortParser: 49 def __init__(self, lines): 50 self._lines = lines 51 self._console_ports = {} 52 53 def parse_port(self, console): 54 if console in self._console_ports: 55 return self._console_ports[console] 56 57 while True: 58 try: 59 line = next(self._lines).strip().decode(errors='ignore') 60 m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line) 61 if m: 62 matched_console = m.group(1) 63 matched_port = int(m.group(2)) 64 if matched_console == console: 65 return matched_port 66 else: 67 self._console_ports[matched_console] = matched_port 68 except StopIteration: 69 # self._lines might be a growing log file 70 pass 71 72 73# This function is backported from Python 3.8. Remove it and replace call sites 74# with shlex.join once OE-core support for earlier Python versions is dropped. 75def shlex_join(split_command): 76 """Return a shell-escaped string from *split_command*.""" 77 return ' '.join(shlex.quote(arg) for arg in split_command) 78 79 80class FVPRunner: 81 def __init__(self, logger): 82 self._logger = logger 83 self._fvp_process = None 84 self._telnets = [] 85 self._pexpects = [] 86 self._config = None 87 88 def start(self, fvpconf, extra_args=[], terminal_choice="none", stdout=subprocess.PIPE): 89 self._logger.debug(f"Loading {fvpconf}") 90 self._config = load(fvpconf) 91 92 cli = cli_from_config(self._config, terminal_choice) 93 cli += extra_args 94 95 # Pass through environment variables needed for GUI applications, such 96 # as xterm, to work. 97 env = self._config['env'] 98 for name in ('DISPLAY', 'PATH', 'WAYLAND_DISPLAY', 'XAUTHORITY'): 99 if name in os.environ: 100 env[name] = os.environ[name] 101 102 # Allow filepath to be relative to fvp configuration file 103 cwd = os.path.dirname(fvpconf) or None 104 self._logger.debug(f"FVP call will be executed in working directory: {cwd}") 105 106 self._logger.debug(f"Constructed FVP call: {shlex_join(cli)}") 107 self._fvp_process = subprocess.Popen( 108 cli, 109 stdin=subprocess.DEVNULL, stdout=stdout, stderr=subprocess.STDOUT, 110 env=env, 111 cwd=cwd) 112 113 def stop(self): 114 if self._fvp_process: 115 self._logger.debug(f"Terminating FVP PID {self._fvp_process.pid}") 116 try: 117 self._fvp_process.terminate() 118 self._fvp_process.wait(10.0) 119 except subprocess.TimeoutExpired: 120 self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}") 121 self._fvp_process.kill() 122 except ProcessLookupError: 123 pass 124 125 for telnet in self._telnets: 126 try: 127 telnet.terminate() 128 telnet.wait(10.0) 129 except subprocess.TimeoutExpired: 130 telnet.kill() 131 except ProcessLookupError: 132 pass 133 134 for console in self._pexpects: 135 import pexpect 136 # Ensure pexpect logs all remaining output to the logfile 137 console.expect(pexpect.EOF, timeout=5.0) 138 console.close() 139 140 if self._fvp_process and self._fvp_process.returncode and \ 141 self._fvp_process.returncode > 0: 142 # Return codes < 0 indicate that the process was explicitly 143 # terminated above. 144 self._logger.info(f"FVP quit with code {self._fvp_process.returncode}") 145 return self._fvp_process.returncode 146 else: 147 return 0 148 149 def wait(self, timeout): 150 self._fvp_process.wait(timeout) 151 152 def getConfig(self): 153 return self._config 154 155 @property 156 def stdout(self): 157 return self._fvp_process.stdout 158 159 def create_telnet(self, port): 160 check_telnet() 161 telnet = subprocess.Popen(["telnet", "localhost", str(port)], stdin=sys.stdin, stdout=sys.stdout) 162 self._telnets.append(telnet) 163 return telnet 164 165 def create_pexpect(self, port, **kwargs): 166 import pexpect 167 instance = pexpect.spawn(f"telnet localhost {port}", **kwargs) 168 self._pexpects.append(instance) 169 return instance 170 171 def pid(self): 172 return self._fvp_process.pid 173