1import sys, os, re, struct, operator, math 2from collections import defaultdict 3import unittest 4 5sys.path.insert(0, os.getcwd()) 6 7import pybootchartgui.parsing as parsing 8import pybootchartgui.main as main 9 10debug = False 11 12def floatEq(f1, f2): 13 return math.fabs(f1-f2) < 0.00001 14 15bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') 16parser = main._mk_options_parser() 17options, args = parser.parse_args(['--q', bootchart_dir]) 18writer = main._mk_writer(options) 19 20class TestBCParser(unittest.TestCase): 21 22 def setUp(self): 23 self.name = "My first unittest" 24 self.rootdir = bootchart_dir 25 26 def mk_fname(self,f): 27 return os.path.join(self.rootdir, f) 28 29 def testParseHeader(self): 30 trace = parsing.Trace(writer, args, options) 31 state = parsing.parse_file(writer, trace, self.mk_fname('header')) 32 self.assertEqual(6, len(state.headers)) 33 self.assertEqual(2, parsing.get_num_cpus(state.headers)) 34 35 def test_parseTimedBlocks(self): 36 trace = parsing.Trace(writer, args, options) 37 state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log')) 38 self.assertEqual(141, len(state.disk_stats)) 39 40 def testParseProcPsLog(self): 41 trace = parsing.Trace(writer, args, options) 42 state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) 43 samples = state.ps_stats 44 processes = samples.process_map 45 sorted_processes = [processes[k] for k in sorted(processes.keys())] 46 47 ps_data = open(self.mk_fname('extract2.proc_ps.log')) 48 for index, line in enumerate(ps_data): 49 tokens = line.split(); 50 process = sorted_processes[index] 51 if debug: 52 print(tokens[0:4]) 53 print(process.pid / 1000, process.cmd, process.ppid, len(process.samples)) 54 print('-------------------') 55 56 self.assertEqual(tokens[0], str(process.pid // 1000)) 57 self.assertEqual(tokens[1], str(process.cmd)) 58 self.assertEqual(tokens[2], str(process.ppid // 1000)) 59 self.assertEqual(tokens[3], str(len(process.samples))) 60 ps_data.close() 61 62 def testparseProcDiskStatLog(self): 63 trace = parsing.Trace(writer, args, options) 64 state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header')) 65 state_with_headers.headers['system.cpu'] = 'xxx (2)' 66 samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats 67 self.assertEqual(141, len(samples)) 68 69 diskstats_data = open(self.mk_fname('extract.proc_diskstats.log')) 70 for index, line in enumerate(diskstats_data): 71 tokens = line.split('\t') 72 sample = samples[index] 73 if debug: 74 print(line.rstrip()) 75 print(sample) 76 print('-------------------') 77 78 self.assertEqual(tokens[0], str(sample.time)) 79 self.assert_(floatEq(float(tokens[1]), sample.read)) 80 self.assert_(floatEq(float(tokens[2]), sample.write)) 81 self.assert_(floatEq(float(tokens[3]), sample.util)) 82 diskstats_data.close() 83 84 def testparseProcStatLog(self): 85 trace = parsing.Trace(writer, args, options) 86 samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats 87 self.assertEqual(141, len(samples)) 88 89 stat_data = open(self.mk_fname('extract.proc_stat.log')) 90 for index, line in enumerate(stat_data): 91 tokens = line.split('\t') 92 sample = samples[index] 93 if debug: 94 print(line.rstrip()) 95 print(sample) 96 print('-------------------') 97 self.assert_(floatEq(float(tokens[0]), sample.time)) 98 self.assert_(floatEq(float(tokens[1]), sample.user)) 99 self.assert_(floatEq(float(tokens[2]), sample.sys)) 100 self.assert_(floatEq(float(tokens[3]), sample.io)) 101 stat_data.close() 102 103if __name__ == '__main__': 104 unittest.main() 105 106