1import re 2import subprocess 3import os 4import shlex 5import shutil 6import sys 7 8from .terminal import terminals 9from .conffile import load 10 11def cli_from_config(config, terminal_choice): 12 cli = [] 13 if config["fvp-bindir"]: 14 cli.append(os.path.join(config["fvp-bindir"], config["exe"])) 15 else: 16 cli.append(config["exe"]) 17 18 for param, value in config["parameters"].items(): 19 cli.extend(["--parameter", f"{param}={value}"]) 20 21 for value in config["data"]: 22 cli.extend(["--data", value]) 23 24 for param, value in config["applications"].items(): 25 cli.extend(["--application", f"{param}={value}"]) 26 27 for terminal, name in config["terminals"].items(): 28 # If terminals are enabled and this terminal has been named 29 if terminal_choice != "none" and name: 30 # TODO if raw mode 31 # cli.extend(["--parameter", f"{terminal}.mode=raw"]) 32 cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command.format(name=name)}"]) 33 else: 34 # Disable terminal 35 cli.extend(["--parameter", f"{terminal}.start_telnet=0"]) 36 37 cli.extend(config["args"]) 38 39 return cli 40 41def check_telnet(): 42 # Check that telnet is present 43 if not bool(shutil.which("telnet")): 44 raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.") 45 46 47class ConsolePortParser: 48 def __init__(self, lines): 49 self._lines = lines 50 self._console_ports = {} 51 52 def parse_port(self, console): 53 if console in self._console_ports: 54 return self._console_ports[console] 55 56 while True: 57 try: 58 line = next(self._lines).strip().decode(errors='ignore') 59 m = re.search(r"(\S+): Listening for serial connection on port (\d+)$", line) 60 if m: 61 matched_console = m.group(1) 62 matched_port = int(m.group(2)) 63 if matched_console == console: 64 return matched_port 65 else: 66 self._console_ports[matched_console] = matched_port 67 except StopIteration: 68 # self._lines might be a growing log file 69 pass 70 71 72# This function is backported from Python 3.8. Remove it and replace call sites 73# with shlex.join once OE-core support for earlier Python versions is dropped. 74def shlex_join(split_command): 75 """Return a shell-escaped string from *split_command*.""" 76 return ' '.join(shlex.quote(arg) for arg in split_command) 77 78 79class FVPRunner: 80 def __init__(self, logger): 81 self._logger = logger 82 self._fvp_process = None 83 self._telnets = [] 84 self._pexpects = [] 85 self._config = None 86 87 def start(self, fvpconf, extra_args=[], terminal_choice="none", stdout=subprocess.PIPE): 88 self._logger.debug(f"Loading {fvpconf}") 89 self._config = load(fvpconf) 90 91 cli = cli_from_config(self._config, terminal_choice) 92 cli += extra_args 93 94 # Pass through environment variables needed for GUI applications, such 95 # as xterm, to work. 96 env = self._config['env'] 97 for name in ('DISPLAY', 'PATH', 'WAYLAND_DISPLAY', 'XAUTHORITY'): 98 if name in os.environ: 99 env[name] = os.environ[name] 100 101 # Allow filepath to be relative to fvp configuration file 102 cwd = os.path.dirname(fvpconf) or None 103 self._logger.debug(f"FVP call will be executed in working directory: {cwd}") 104 105 self._logger.debug(f"Constructed FVP call: {shlex_join(cli)}") 106 self._fvp_process = subprocess.Popen( 107 cli, 108 stdin=subprocess.DEVNULL, stdout=stdout, stderr=subprocess.STDOUT, 109 env=env, 110 cwd=cwd) 111 112 def stop(self): 113 if self._fvp_process: 114 self._logger.debug(f"Terminating FVP PID {self._fvp_process.pid}") 115 try: 116 self._fvp_process.terminate() 117 self._fvp_process.wait(10.0) 118 except subprocess.TimeoutExpired: 119 self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}") 120 self._fvp_process.kill() 121 except ProcessLookupError: 122 pass 123 124 for telnet in self._telnets: 125 try: 126 telnet.terminate() 127 telnet.wait(10.0) 128 except subprocess.TimeoutExpired: 129 telnet.kill() 130 except ProcessLookupError: 131 pass 132 133 for console in self._pexpects: 134 import pexpect 135 # Ensure pexpect logs all remaining output to the logfile 136 try: 137 console.expect(pexpect.EOF, timeout=30.0) 138 except pexpect.TIMEOUT: 139 pexpect_logfile = "" 140 if console.logfile is not None: 141 pexpect_logfile = f" ({console.logfile})" 142 self._logger.debug(f"Unable to get EOF on pexpect spawn obj{pexpect_logfile}.") 143 console.close(force=True) 144 145 if self._fvp_process and self._fvp_process.returncode and \ 146 self._fvp_process.returncode > 0: 147 # Return codes < 0 indicate that the process was explicitly 148 # terminated above. 149 self._logger.info(f"FVP quit with code {self._fvp_process.returncode}") 150 return self._fvp_process.returncode 151 else: 152 return 0 153 154 def wait(self, timeout): 155 self._fvp_process.wait(timeout) 156 157 def getConfig(self): 158 return self._config 159 160 @property 161 def stdout(self): 162 return self._fvp_process.stdout 163 164 def create_telnet(self, port): 165 check_telnet() 166 telnet = subprocess.Popen(["telnet", "localhost", str(port)], stdin=sys.stdin, stdout=sys.stdout) 167 self._telnets.append(telnet) 168 return telnet 169 170 def create_pexpect(self, port, **kwargs): 171 import pexpect 172 instance = pexpect.spawn(f"telnet localhost {port}", **kwargs) 173 self._pexpects.append(instance) 174 return instance 175 176 def pid(self): 177 return self._fvp_process.pid 178