1# 2# SPDX-License-Identifier: MIT 3# 4 5import os 6import sys 7import json 8import errno 9import datetime 10import itertools 11from .commands import runCmd 12 13class BaseDumper(object): 14 """ Base class to dump commands from host/target """ 15 16 def __init__(self, cmds, parent_dir): 17 self.cmds = [] 18 # Some testing doesn't inherit testimage, so it is needed 19 # to set some defaults. 20 self.parent_dir = parent_dir 21 self.dump_dir = parent_dir 22 dft_cmds = """ top -bn1 23 iostat -x -z -N -d -p ALL 20 2 24 ps -ef 25 free 26 df 27 memstat 28 dmesg 29 ip -s link 30 netstat -an""" 31 if not cmds: 32 cmds = dft_cmds 33 for cmd in cmds.split('\n'): 34 cmd = cmd.lstrip() 35 if not cmd or cmd[0] == '#': 36 continue 37 self.cmds.append(cmd) 38 39 def create_dir(self, dir_suffix): 40 dump_subdir = ("%s_%s" % ( 41 datetime.datetime.now().strftime('%Y%m%d%H%M'), 42 dir_suffix)) 43 dump_dir = os.path.join(self.parent_dir, dump_subdir) 44 try: 45 os.makedirs(dump_dir) 46 except OSError as err: 47 if err.errno != errno.EEXIST: 48 raise err 49 self.dump_dir = dump_dir 50 51 def _construct_filename(self, command): 52 if isinstance(self, HostDumper): 53 prefix = "host" 54 elif isinstance(self, TargetDumper): 55 prefix = "target" 56 elif isinstance(self, MonitorDumper): 57 prefix = "qmp" 58 else: 59 prefix = "unknown" 60 for i in itertools.count(): 61 filename = "%s_%02d_%s" % (prefix, i, command) 62 fullname = os.path.join(self.dump_dir, filename) 63 if not os.path.exists(fullname): 64 break 65 return fullname 66 67 def _write_dump(self, command, output): 68 fullname = self._construct_filename(command) 69 if isinstance(self, MonitorDumper): 70 with open(fullname, 'w') as json_file: 71 json.dump(output, json_file, indent=4) 72 else: 73 with open(fullname, 'w') as dump_file: 74 dump_file.write(output) 75 76class HostDumper(BaseDumper): 77 """ Class to get dumps from the host running the tests """ 78 79 def __init__(self, cmds, parent_dir): 80 super(HostDumper, self).__init__(cmds, parent_dir) 81 82 def dump_host(self, dump_dir=""): 83 if dump_dir: 84 self.dump_dir = dump_dir 85 env = os.environ.copy() 86 env['PATH'] = '/usr/sbin:/sbin:/usr/bin:/bin' 87 env['COLUMNS'] = '9999' 88 for cmd in self.cmds: 89 result = runCmd(cmd, ignore_status=True, env=env) 90 self._write_dump(cmd.split()[0], result.output) 91 92class TargetDumper(BaseDumper): 93 """ Class to get dumps from target, it only works with QemuRunner """ 94 95 def __init__(self, cmds, parent_dir, runner): 96 super(TargetDumper, self).__init__(cmds, parent_dir) 97 self.runner = runner 98 99 def dump_target(self, dump_dir=""): 100 if dump_dir: 101 self.dump_dir = dump_dir 102 for cmd in self.cmds: 103 # We can continue with the testing if serial commands fail 104 try: 105 (status, output) = self.runner.run_serial(cmd) 106 self._write_dump(cmd.split()[0], output) 107 except: 108 print("Tried to dump info from target but " 109 "serial console failed") 110 print("Failed CMD: %s" % (cmd)) 111 112class MonitorDumper(BaseDumper): 113 """ Class to get dumps via the Qemu Monitor, it only works with QemuRunner """ 114 115 def __init__(self, cmds, parent_dir, runner): 116 super(MonitorDumper, self).__init__(cmds, parent_dir) 117 self.runner = runner 118 119 def dump_monitor(self, dump_dir=""): 120 if self.runner is None: 121 return 122 if dump_dir: 123 self.dump_dir = dump_dir 124 for cmd in self.cmds: 125 cmd_name = cmd.split()[0] 126 try: 127 if len(cmd.split()) > 1: 128 cmd_args = cmd.split()[1] 129 if "%s" in cmd_args: 130 filename = self._construct_filename(cmd_name) 131 cmd_data = json.loads(cmd_args % (filename)) 132 output = self.runner.run_monitor(cmd_name, cmd_data) 133 else: 134 output = self.runner.run_monitor(cmd_name) 135 self._write_dump(cmd_name, output) 136 except Exception as e: 137 print("Failed to dump QMP CMD: %s with\nExecption: %s" % (cmd_name, e)) 138