1#!/usr/bin/python3 2 3# SPDX-License-Identifier: Apache-2.0 4# Copyright 2020 IBM Corp. 5 6import argparse 7import csv 8import enum 9from collections import namedtuple 10from datetime import time, timedelta 11 12import crc8 13 14 15class UCD90320Command(bytes, enum.Enum): 16 def __new__(cls, value, width): 17 obj = bytes.__new__(cls, [value]) 18 obj._value_ = value 19 obj.width = width 20 return obj 21 22 MONITOR_CONFIG = (0xD5, -1) 23 NUM_PAGES = (0xD6, 1) 24 GPIO_SELECT = (0xFA, 1) 25 GPIO_CONFIG = (0xFB, 1) 26 DEVICE_ID = (0xFD, -1) 27 28 29class PMBusCommand(bytes, enum.Enum): 30 def __new__(cls, value, width): 31 obj = bytes.__new__(cls, [value]) 32 obj._value_ = value 33 obj.width = width 34 return obj 35 36 PAGE = (0x00, 1) 37 OPERATION = (0x01, 1) 38 ON_OFF_CONFIG = (0x02, 1) 39 CLEAR_FAULTS = (0x03, 0) 40 PHASE = (0x04, 1) 41 PAGE_PLUS_WRITE = (0x05, -1) 42 PAGE_PLUS_READ = (0x06, -1) 43 WRITE_PROTECT = (0x10, 1) 44 STORE_DEFAULT_ALL = (0x11, 0) 45 RESTORE_DEFAULT_ALL = (0x12, 0) 46 STORE_DEFAULT_CODE = (0x13, 1) 47 RESTORE_DEFAULT_CODE = (0x14, 1) 48 STORE_USER_ALL = (0x15, 0) 49 RESTORE_USER_ALL = (0x16, 0) 50 STORE_USER_CODE = (0x17, 1) 51 RESTORE_USER_CODE = (0x18, 1) 52 CAPABILITY = (0x19, 1) 53 QUERY = (0x1A, 1) 54 SMBALERT_MASK = (0x1B, 2) 55 VOUT_MODE = (0x20, 1) 56 VOUT_COMMAND = (0x21, 2) 57 VOUT_CAL_OFFSET = (0x23, 2) 58 POUT_MAX = (0x31, 2) 59 FREQUENCY_SWITCH = (0x33, 2) 60 VIN_OFF = (0x36, 2) 61 FAN_COMMAND_1 = (0x3B, 2) 62 FAN_COMMAND_4 = (0x3F, 2) 63 VOUT_OV_FAULT_LIMIT = (0x40, 2) 64 VOUT_OV_WARN_LIMIT = (0x42, 2) 65 VOUT_UV_WARN_LIMIT = (0x43, 2) 66 VOUT_UV_FAULT_LIMIT = (0x44, 2) 67 IOUT_OC_LV_FAULT_LIMIT = (0x48, 2) 68 IOUT_OC_LV_FAULT_RESPONSE = (0x49, 1) 69 IOUT_UC_FAULT_RESPONSE = (0x4C, 1) 70 OT_FAULT_LIMIT = (0x4F, 2) 71 OT_WARN_LIMIT = (0x51, 2) 72 UT_WARN_LIMIT = (0x52, 2) 73 UT_FAULT_LIMIT = (0x53, 2) 74 VIN_UV_FAULT_LIMIT = (0x59, 2) 75 IIN_OC_FAULT_RESPONSE = (0x5C, 1) 76 TOFF_MAX_WARN_LIMIT = (0x66, 2) 77 STATUS_WORD = (0x79, 2) 78 STATUS_CML = (0x7E, 1) 79 STATUS_OTHER = (0x7F, 1) 80 STATUS_MFR_SPECIFIC = (0x80, 1) 81 READ_TEMPERATURE_3 = (0x8F, 2) 82 PMBUS_REVISION = (0x98, 1) 83 MFR_MODEL = (0x9A, -1) 84 IC_DEVICE_REV = (0xAE, -1) 85 USER_DATA_00 = (0xB0, -1) 86 USER_DATA_08 = (0xB8, -1) 87 MFR_SPECIFIC_05 = (0xD5, None) 88 MFR_SPECIFIC_06 = (0xD6, None) 89 MFR_SPECIFIC_42 = (0xFA, None) 90 MFR_SPECIFIC_43 = (0xFB, None) 91 MFR_SPECIFIC_45 = (0xFD, None) 92 93 94class I2CCondition(enum.Enum): 95 START = 0 96 STOP = 1 97 98 99class I2CRecord(enum.Enum): 100 WRITE = 0 101 READ = 1 102 103 104class I2CResponse(enum.Enum): 105 ACK = 0 106 NACK = 1 107 108 109# Level,Index,m:s.ms.us,Dur,Len,Err,S/P,Addr,Record,Data 110# 0,1,0:29.722.525,210.600 us,1 B,,S,32,Write Transaction,0E 111I2CTransfer = namedtuple( 112 "I2CTransfer", 113 ( 114 "level", 115 "index", 116 "timestamp", 117 "duration", 118 "length", 119 "error", 120 "conditions", 121 "address", 122 "record", 123 "data", 124 ), 125) 126Timestamp = namedtuple( 127 "Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"] 128) 129I2CData = namedtuple("I2CData", ["response", "data"]) 130 131SMBusTransfer = namedtuple("SMBusTransfer", ["command", "response"]) 132 133 134def to_duration(field): 135 if field.endswith("us"): 136 if "." in field: 137 ms, us, _ = 0, *field.split(".") 138 else: 139 ms, us = 0, int(field.rstrip("us")) 140 elif field.endswith("ms"): 141 if "." in field: 142 ms, us, _ = field.split(".") 143 else: 144 ms, us = int(field.rstrip("ms")), 0 145 else: 146 raise ValueError 147 return timedelta(milliseconds=int(ms), microseconds=int(us)) 148 149 150def to_timestamp(field): 151 ts = Timestamp(*list(int(v) for v in field.replace(":", ".").split("."))) 152 return time( 153 0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds 154 ) 155 156 157def to_i2cdata(field): 158 resp = I2CResponse.NACK if field.endswith("*") else I2CResponse.ACK 159 return I2CData(resp, bytes(int(v, 16) for v in field.rstrip("*").split())) 160 161 162def to_address(field): 163 return int(field, 16) 164 165 166def to_i2cconditions(field): 167 if "S" == field: 168 return {I2CCondition.START} 169 elif "SP" == field: 170 return {I2CCondition.START, I2CCondition.STOP} 171 raise ValueError 172 173 174def to_i2crecord(field): 175 if "Write Transaction" == field: 176 return I2CRecord.WRITE 177 if "Read Transaction" == field: 178 return I2CRecord.READ 179 raise ValueError 180 181 182def to_i2ctransfer(line): 183 return I2CTransfer( 184 *line[:2], 185 to_timestamp(line[2]), 186 to_duration(line[3]), 187 *line[4:6], 188 to_i2cconditions(line[6]), 189 to_address(line[7]), 190 to_i2crecord(line[8]), 191 to_i2cdata(line[9]) 192 ) 193 194 195def pmbuscommand_style(xfer): 196 return PMBusCommand(xfer.data.data[0]) 197 198 199def ucd90320command_style(xfer): 200 try: 201 return UCD90320Command(xfer.data.data[0]) 202 except Exception: 203 return pmbuscommand_style(xfer) 204 205 206def as_smbustransfers(i2cxfers, style): 207 command = None 208 for i2cxfer in i2cxfers: 209 if i2cxfer.conditions == {I2CCondition.START}: 210 assert not command 211 command = i2cxfer 212 if i2cxfer.conditions == {I2CCondition.START, I2CCondition.STOP}: 213 if command: 214 yield PMBusRead(style(command), command, i2cxfer) 215 command = None 216 else: 217 yield PMBusWrite(style(i2cxfer), i2cxfer) 218 219 220def smbus_pec(data): 221 hash = crc8.crc8() 222 hash.update(data) 223 return hash.digest()[0] 224 225 226def smbus_pec_pack_address(address, record): 227 return (address << 1) | record.value 228 229 230class PMBusTransfer(object): 231 def calculate_pec(self): 232 raise NotImplementedError 233 234 def validate_pec(self): 235 if self.pec is None: 236 return True 237 derived = self.calculate_pec() 238 provided = self.pec 239 return provided == derived 240 241 def validate_xfer(self): 242 raise NotImplementedError 243 244 def valid(self): 245 return self.validate_xfer() and self.validate_pec() 246 247 248class PMBusWrite(PMBusTransfer): 249 def __init__(self, command, xfer): 250 assert xfer.record == I2CRecord.WRITE 251 self.command = command 252 self.xfer = xfer 253 254 if command.width is None: 255 start, end = 1, len(xfer.data.data) 256 elif command.width == -1: 257 start, end = 1, xfer.data.data[0] + 1 258 else: 259 start, end = 1, command.width + 1 260 261 self.data = xfer.data.data[start:end] 262 tail = self.data[end:] 263 264 if len(tail) == 1: 265 (self.pec,) = tail 266 else: 267 self.pec = None 268 269 self.response = xfer.data.response 270 271 def calculate_pec(self): 272 data = ( 273 smbus_pec_pack_address(self.xfer.address, self.xfer.record), 274 *self.xfer.data.data[:-1], 275 ) 276 return smbus_pec(bytes(data)) 277 278 def validate_xfer(self): 279 return self.response == I2CResponse.ACK 280 281 def __str__(self): 282 timestamp = self.xfer.timestamp.strftime("%M:%S.%f") 283 duration = self.xfer.duration.total_seconds() 284 data = "[ " + " ".join("{:02x}".format(v) for v in self.data) + " ]" 285 286 status = [] 287 if self.response != I2CResponse.ACK: 288 status.append(self.response.name) 289 290 if not self.validate_pec(): 291 status.append("!PEC") 292 293 if status: 294 status = " ".join(status) 295 fmt = ( 296 "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} " 297 + "{2.command.name} {4} {5}" 298 ) 299 return fmt.format( 300 timestamp, duration, self, I2CRecord.WRITE, data, status 301 ) 302 303 fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4}" 304 return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data) 305 306 307class PMBusRead(PMBusTransfer): 308 def __init__(self, command, start, repeat): 309 assert repeat.record == I2CRecord.READ 310 self.command = command 311 self.start = start 312 self.repeat = repeat 313 assert start.address == repeat.address 314 self.address = start.address 315 316 if self.command.width is None: 317 start, end = 0, len(repeat.data.data) 318 elif self.command.width == -1: 319 start, end = 1, repeat.data.data[0] + 1 320 else: 321 start, end = 0, command.width 322 323 self.data = repeat.data.data[start:end] 324 tail = repeat.data.data[end:] 325 326 if len(tail) == 1: 327 (self.pec,) = tail 328 else: 329 self.pec = None 330 331 self.response = repeat.data.response 332 333 def calculate_pec(self): 334 data = ( 335 smbus_pec_pack_address(self.start.address, self.start.record), 336 *self.start.data.data, 337 smbus_pec_pack_address(self.repeat.address, self.repeat.record), 338 *self.repeat.data.data[:-1], 339 ) 340 return smbus_pec(bytes(data)) 341 342 def validate_xfer(self): 343 return ( 344 self.start.data.response == I2CResponse.ACK 345 and self.repeat.data.response == I2CResponse.NACK 346 ) 347 348 def __str__(self): 349 timestamp = self.start.timestamp.strftime("%M:%S.%f") 350 duration = self.start.duration.total_seconds() 351 352 status = [] 353 if self.start.data.response != I2CResponse.ACK: 354 status.append(self.start.data.response.name) 355 356 if status: 357 status = " ".join(status) 358 fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name} {4}" 359 start = fmt.format( 360 timestamp, duration, self, I2CRecord.READ, status 361 ) 362 else: 363 fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name}" 364 start = fmt.format(timestamp, duration, self, I2CRecord.READ) 365 366 timestamp = self.repeat.timestamp.strftime("%M:%S.%f") 367 duration = self.repeat.duration.total_seconds() 368 data = " ".join("{:02x}".format(v) for v in self.data) 369 data = "[ " + data + " ]" 370 371 status = [] 372 if self.repeat.data.response != I2CResponse.NACK: 373 status.append(self.repeat.data.response.name) 374 375 if not self.validate_pec(): 376 status.append("!PEC") 377 378 status = " ".join(status) 379 fmt = "{0} {1:.6f} {2} {3}" 380 repeat = fmt.format(timestamp, duration, data, status) 381 382 return start + " | " + repeat 383 384 385def filter_source(src): 386 for line in src: 387 if not line: 388 continue 389 if line.startswith("#"): 390 continue 391 if "Capture started" in line: 392 continue 393 if "Capture stopped" in line: 394 continue 395 yield line 396 397 398def main(): 399 parser = argparse.ArgumentParser() 400 parser.add_argument("--after", type=str) 401 parser.add_argument("--before", type=str) 402 parser.add_argument("--longer-than", type=str) 403 parser.add_argument("--address", type=lambda x: int(x, 0)) 404 parser.add_argument("--pmbus", action="store_true") 405 parser.add_argument("--ucd90320", action="store_true") 406 parser.add_argument("--bad-transfers", action="store_true") 407 parser.add_argument("file", type=str) 408 args = parser.parse_args() 409 with open(args.file, "r") as src: 410 data = (line for line in filter_source(src.readlines())) 411 xfers = (to_i2ctransfer(e) for e in csv.reader(data)) 412 if args.after: 413 after = to_timestamp(args.after) 414 xfers = (e for e in xfers if e.timestamp > after) 415 if args.before: 416 before = to_timestamp(args.before) 417 xfers = (e for e in xfers if e.timestamp < before) 418 if args.longer_than: 419 minimum = to_duration(args.longer_than) 420 xfers = (e for e in xfers if e.duration > minimum) 421 if args.address is not None: 422 xfers = (e for e in xfers if e.address == args.address) 423 if args.ucd90320 or args.pmbus: 424 if args.ucd90320: 425 style = ucd90320command_style 426 else: 427 style = pmbuscommand_style 428 for xfer in as_smbustransfers(xfers, style): 429 if args.bad_transfers and xfer.valid(): 430 continue 431 print(xfer) 432 else: 433 for xfer in xfers: 434 timestamp = xfer.timestamp.strftime("%M:%S.%f") 435 duration = xfer.duration.total_seconds() 436 data = ( 437 "[ " 438 + " ".join("{:02x}".format(v) for v in xfer.data.data) 439 + " ]" 440 ) 441 res = xfer.data.response.name 442 fmt = "{0} {1:.6f} 0x{2.address:x} {2.record.name} {3} {4}" 443 print(fmt.format(timestamp, duration, xfer, data, res)) 444 445 446if __name__ == "__main__": 447 main() 448