1# 2# SPDX-License-Identifier: MIT 3# 4 5import os 6import sys 7import json 8import errno 9import datetime 10import itertools 11from .commands import runCmd 12 13class BaseDumper(object): 14 """ Base class to dump commands from host/target """ 15 16 def __init__(self, cmds, parent_dir): 17 self.cmds = [] 18 # Some testing doesn't inherit testimage, so it is needed 19 # to set some defaults. 20 self.parent_dir = parent_dir 21 self.dump_dir = parent_dir 22 dft_cmds = """ top -bn1 23 iostat -x -z -N -d -p ALL 20 2 24 ps -ef 25 free 26 df 27 memstat 28 dmesg 29 ip -s link 30 netstat -an""" 31 if not cmds: 32 cmds = dft_cmds 33 for cmd in cmds.split('\n'): 34 cmd = cmd.lstrip() 35 if not cmd or cmd[0] == '#': 36 continue 37 self.cmds.append(cmd) 38 39 def create_dir(self, dir_suffix): 40 dump_subdir = ("%s_%s" % ( 41 datetime.datetime.now().strftime('%Y%m%d%H%M'), 42 dir_suffix)) 43 dump_dir = os.path.join(self.parent_dir, dump_subdir) 44 try: 45 os.makedirs(dump_dir) 46 except OSError as err: 47 if err.errno != errno.EEXIST: 48 raise err 49 self.dump_dir = dump_dir 50 51 def _construct_filename(self, command): 52 if isinstance(self, HostDumper): 53 prefix = "host" 54 elif isinstance(self, TargetDumper): 55 prefix = "target" 56 elif isinstance(self, MonitorDumper): 57 prefix = "qmp" 58 else: 59 prefix = "unknown" 60 for i in itertools.count(): 61 filename = "%s_%02d_%s" % (prefix, i, command) 62 fullname = os.path.join(self.dump_dir, filename) 63 if not os.path.exists(fullname): 64 break 65 return fullname 66 67 def _write_dump(self, command, output): 68 fullname = self._construct_filename(command) 69 os.makedirs(os.path.dirname(fullname), exist_ok=True) 70 if isinstance(self, MonitorDumper): 71 with open(fullname, 'w') as json_file: 72 json.dump(output, json_file, indent=4) 73 else: 74 with open(fullname, 'w') as dump_file: 75 dump_file.write(output) 76 77class HostDumper(BaseDumper): 78 """ Class to get dumps from the host running the tests """ 79 80 def __init__(self, cmds, parent_dir): 81 super(HostDumper, self).__init__(cmds, parent_dir) 82 83 def dump_host(self, dump_dir=""): 84 if dump_dir: 85 self.dump_dir = dump_dir 86 env = os.environ.copy() 87 env['PATH'] = '/usr/sbin:/sbin:/usr/bin:/bin' 88 env['COLUMNS'] = '9999' 89 for cmd in self.cmds: 90 result = runCmd(cmd, ignore_status=True, env=env) 91 self._write_dump(cmd.split()[0], result.output) 92 93class TargetDumper(BaseDumper): 94 """ Class to get dumps from target, it only works with QemuRunner """ 95 96 def __init__(self, cmds, parent_dir, runner): 97 super(TargetDumper, self).__init__(cmds, parent_dir) 98 self.runner = runner 99 100 def dump_target(self, dump_dir=""): 101 if dump_dir: 102 self.dump_dir = dump_dir 103 for cmd in self.cmds: 104 # We can continue with the testing if serial commands fail 105 try: 106 (status, output) = self.runner.run_serial(cmd) 107 self._write_dump(cmd.split()[0], output) 108 except: 109 print("Tried to dump info from target but " 110 "serial console failed") 111 print("Failed CMD: %s" % (cmd)) 112 113class MonitorDumper(BaseDumper): 114 """ Class to get dumps via the Qemu Monitor, it only works with QemuRunner """ 115 116 def __init__(self, cmds, parent_dir, runner): 117 super(MonitorDumper, self).__init__(cmds, parent_dir) 118 self.runner = runner 119 120 def dump_monitor(self, dump_dir=""): 121 if self.runner is None: 122 return 123 if dump_dir: 124 self.dump_dir = dump_dir 125 for cmd in self.cmds: 126 cmd_name = cmd.split()[0] 127 try: 128 if len(cmd.split()) > 1: 129 cmd_args = cmd.split()[1] 130 if "%s" in cmd_args: 131 filename = self._construct_filename(cmd_name) 132 cmd_data = json.loads(cmd_args % (filename)) 133 output = self.runner.run_monitor(cmd_name, cmd_data) 134 else: 135 output = self.runner.run_monitor(cmd_name) 136 self._write_dump(cmd_name, output) 137 except Exception as e: 138 print("Failed to dump QMP CMD: %s with\nException: %s" % (cmd_name, e)) 139