xref: /openbmc/openbmc-tools/bi2cp/bi2cp (revision a3db66b3)
1#!/usr/bin/python3
2
3# SPDX-License-Identifier: Apache-2.0
4# Copyright 2020 IBM Corp.
5
6import argparse
7import csv
8import enum
9from collections import namedtuple
10from datetime import time, timedelta
11
12import crc8
13
14
15class UCD90320Command(bytes, enum.Enum):
16    def __new__(cls, value, width):
17        obj = bytes.__new__(cls, [value])
18        obj._value_ = value
19        obj.width = width
20        return obj
21
22    MONITOR_CONFIG = (0xD5, -1)
23    NUM_PAGES = (0xD6, 1)
24    GPIO_SELECT = (0xFA, 1)
25    GPIO_CONFIG = (0xFB, 1)
26    DEVICE_ID = (0xFD, -1)
27
28
29class PMBusCommand(bytes, enum.Enum):
30    def __new__(cls, value, width):
31        obj = bytes.__new__(cls, [value])
32        obj._value_ = value
33        obj.width = width
34        return obj
35
36    PAGE = (0x00, 1)
37    OPERATION = (0x01, 1)
38    ON_OFF_CONFIG = (0x02, 1)
39    CLEAR_FAULTS = (0x03, 0)
40    PHASE = (0x04, 1)
41    PAGE_PLUS_WRITE = (0x05, -1)
42    PAGE_PLUS_READ = (0x06, -1)
43    WRITE_PROTECT = (0x10, 1)
44    STORE_DEFAULT_ALL = (0x11, 0)
45    RESTORE_DEFAULT_ALL = (0x12, 0)
46    STORE_DEFAULT_CODE = (0x13, 1)
47    RESTORE_DEFAULT_CODE = (0x14, 1)
48    STORE_USER_ALL = (0x15, 0)
49    RESTORE_USER_ALL = (0x16, 0)
50    STORE_USER_CODE = (0x17, 1)
51    RESTORE_USER_CODE = (0x18, 1)
52    CAPABILITY = (0x19, 1)
53    QUERY = (0x1A, 1)
54    SMBALERT_MASK = (0x1B, 2)
55    VOUT_MODE = (0x20, 1)
56    VOUT_COMMAND = (0x21, 2)
57    VOUT_CAL_OFFSET = (0x23, 2)
58    POUT_MAX = (0x31, 2)
59    FREQUENCY_SWITCH = (0x33, 2)
60    VIN_OFF = (0x36, 2)
61    FAN_COMMAND_1 = (0x3B, 2)
62    FAN_COMMAND_4 = (0x3F, 2)
63    VOUT_OV_FAULT_LIMIT = (0x40, 2)
64    VOUT_OV_WARN_LIMIT = (0x42, 2)
65    VOUT_UV_WARN_LIMIT = (0x43, 2)
66    VOUT_UV_FAULT_LIMIT = (0x44, 2)
67    IOUT_OC_LV_FAULT_LIMIT = (0x48, 2)
68    IOUT_OC_LV_FAULT_RESPONSE = (0x49, 1)
69    IOUT_UC_FAULT_RESPONSE = (0x4C, 1)
70    OT_FAULT_LIMIT = (0x4F, 2)
71    OT_WARN_LIMIT = (0x51, 2)
72    UT_WARN_LIMIT = (0x52, 2)
73    UT_FAULT_LIMIT = (0x53, 2)
74    VIN_UV_FAULT_LIMIT = (0x59, 2)
75    IIN_OC_FAULT_RESPONSE = (0x5C, 1)
76    TOFF_MAX_WARN_LIMIT = (0x66, 2)
77    STATUS_WORD = (0x79, 2)
78    STATUS_CML = (0x7E, 1)
79    STATUS_OTHER = (0x7F, 1)
80    STATUS_MFR_SPECIFIC = (0x80, 1)
81    READ_TEMPERATURE_3 = (0x8F, 2)
82    PMBUS_REVISION = (0x98, 1)
83    MFR_MODEL = (0x9A, -1)
84    IC_DEVICE_REV = (0xAE, -1)
85    USER_DATA_00 = (0xB0, -1)
86    USER_DATA_08 = (0xB8, -1)
87    MFR_SPECIFIC_05 = (0xD5, None)
88    MFR_SPECIFIC_06 = (0xD6, None)
89    MFR_SPECIFIC_42 = (0xFA, None)
90    MFR_SPECIFIC_43 = (0xFB, None)
91    MFR_SPECIFIC_45 = (0xFD, None)
92
93
94class I2CCondition(enum.Enum):
95    START = 0
96    STOP = 1
97
98
99class I2CRecord(enum.Enum):
100    WRITE = 0
101    READ = 1
102
103
104class I2CResponse(enum.Enum):
105    ACK = 0
106    NACK = 1
107
108
109# Level,Index,m:s.ms.us,Dur,Len,Err,S/P,Addr,Record,Data
110# 0,1,0:29.722.525,210.600 us,1 B,,S,32,Write Transaction,0E
111I2CTransfer = namedtuple(
112    "I2CTransfer",
113    (
114        "level",
115        "index",
116        "timestamp",
117        "duration",
118        "length",
119        "error",
120        "conditions",
121        "address",
122        "record",
123        "data",
124    ),
125)
126Timestamp = namedtuple(
127    "Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"]
128)
129I2CData = namedtuple("I2CData", ["response", "data"])
130
131SMBusTransfer = namedtuple("SMBusTransfer", ["command", "response"])
132
133
134def to_duration(field):
135    if field.endswith("us"):
136        if "." in field:
137            ms, us, _ = 0, *field.split(".")
138        else:
139            ms, us = 0, int(field.rstrip("us"))
140    elif field.endswith("ms"):
141        if "." in field:
142            ms, us, _ = field.split(".")
143        else:
144            ms, us = int(field.rstrip("ms")), 0
145    else:
146        raise ValueError
147    return timedelta(milliseconds=int(ms), microseconds=int(us))
148
149
150def to_timestamp(field):
151    ts = Timestamp(*list(int(v) for v in field.replace(":", ".").split(".")))
152    return time(
153        0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds
154    )
155
156
157def to_i2cdata(field):
158    resp = I2CResponse.NACK if field.endswith("*") else I2CResponse.ACK
159    return I2CData(resp, bytes(int(v, 16) for v in field.rstrip("*").split()))
160
161
162def to_address(field):
163    return int(field, 16)
164
165
166def to_i2cconditions(field):
167    if "S" == field:
168        return {I2CCondition.START}
169    elif "SP" == field:
170        return {I2CCondition.START, I2CCondition.STOP}
171    raise ValueError
172
173
174def to_i2crecord(field):
175    if "Write Transaction" == field:
176        return I2CRecord.WRITE
177    if "Read Transaction" == field:
178        return I2CRecord.READ
179    raise ValueError
180
181
182def to_i2ctransfer(line):
183    return I2CTransfer(
184        *line[:2],
185        to_timestamp(line[2]),
186        to_duration(line[3]),
187        *line[4:6],
188        to_i2cconditions(line[6]),
189        to_address(line[7]),
190        to_i2crecord(line[8]),
191        to_i2cdata(line[9])
192    )
193
194
195def pmbuscommand_style(xfer):
196    return PMBusCommand(xfer.data.data[0])
197
198
199def ucd90320command_style(xfer):
200    try:
201        return UCD90320Command(xfer.data.data[0])
202    except Exception:
203        return pmbuscommand_style(xfer)
204
205
206def as_smbustransfers(i2cxfers, style):
207    command = None
208    for i2cxfer in i2cxfers:
209        if i2cxfer.conditions == {I2CCondition.START}:
210            assert not command
211            command = i2cxfer
212        if i2cxfer.conditions == {I2CCondition.START, I2CCondition.STOP}:
213            if command:
214                yield PMBusRead(style(command), command, i2cxfer)
215                command = None
216            else:
217                yield PMBusWrite(style(i2cxfer), i2cxfer)
218
219
220def smbus_pec(data):
221    hash = crc8.crc8()
222    hash.update(data)
223    return hash.digest()[0]
224
225
226def smbus_pec_pack_address(address, record):
227    return (address << 1) | record.value
228
229
230class PMBusTransfer(object):
231    def calculate_pec(self):
232        raise NotImplementedError
233
234    def validate_pec(self):
235        if self.pec is None:
236            return True
237        derived = self.calculate_pec()
238        provided = self.pec
239        return provided == derived
240
241    def validate_xfer(self):
242        raise NotImplementedError
243
244    def valid(self):
245        return self.validate_xfer() and self.validate_pec()
246
247
248class PMBusWrite(PMBusTransfer):
249    def __init__(self, command, xfer):
250        assert xfer.record == I2CRecord.WRITE
251        self.command = command
252        self.xfer = xfer
253
254        if command.width is None:
255            start, end = 1, len(xfer.data.data)
256        elif command.width == -1:
257            start, end = 1, xfer.data.data[0] + 1
258        else:
259            start, end = 1, command.width + 1
260
261        self.data = xfer.data.data[start:end]
262        tail = self.data[end:]
263
264        if len(tail) == 1:
265            (self.pec,) = tail
266        else:
267            self.pec = None
268
269        self.response = xfer.data.response
270
271    def calculate_pec(self):
272        data = (
273            smbus_pec_pack_address(self.xfer.address, self.xfer.record),
274            *self.xfer.data.data[:-1],
275        )
276        return smbus_pec(bytes(data))
277
278    def validate_xfer(self):
279        return self.response == I2CResponse.ACK
280
281    def __str__(self):
282        timestamp = self.xfer.timestamp.strftime("%M:%S.%f")
283        duration = self.xfer.duration.total_seconds()
284        data = "[ " + " ".join("{:02x}".format(v) for v in self.data) + " ]"
285
286        status = []
287        if self.response != I2CResponse.ACK:
288            status.append(self.response.name)
289
290        if not self.validate_pec():
291            status.append("!PEC")
292
293        if status:
294            status = " ".join(status)
295            fmt = (
296                "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} "
297                + "{2.command.name} {4} {5}"
298            )
299            return fmt.format(
300                timestamp, duration, self, I2CRecord.WRITE, data, status
301            )
302
303        fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4}"
304        return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data)
305
306
307class PMBusRead(PMBusTransfer):
308    def __init__(self, command, start, repeat):
309        assert repeat.record == I2CRecord.READ
310        self.command = command
311        self.start = start
312        self.repeat = repeat
313        assert start.address == repeat.address
314        self.address = start.address
315
316        if self.command.width is None:
317            start, end = 0, len(repeat.data.data)
318        elif self.command.width == -1:
319            start, end = 1, repeat.data.data[0] + 1
320        else:
321            start, end = 0, command.width
322
323        self.data = repeat.data.data[start:end]
324        tail = repeat.data.data[end:]
325
326        if len(tail) == 1:
327            (self.pec,) = tail
328        else:
329            self.pec = None
330
331        self.response = repeat.data.response
332
333    def calculate_pec(self):
334        data = (
335            smbus_pec_pack_address(self.start.address, self.start.record),
336            *self.start.data.data,
337            smbus_pec_pack_address(self.repeat.address, self.repeat.record),
338            *self.repeat.data.data[:-1],
339        )
340        return smbus_pec(bytes(data))
341
342    def validate_xfer(self):
343        return (
344            self.start.data.response == I2CResponse.ACK
345            and self.repeat.data.response == I2CResponse.NACK
346        )
347
348    def __str__(self):
349        timestamp = self.start.timestamp.strftime("%M:%S.%f")
350        duration = self.start.duration.total_seconds()
351
352        status = []
353        if self.start.data.response != I2CResponse.ACK:
354            status.append(self.start.data.response.name)
355
356        if status:
357            status = " ".join(status)
358            fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name} {4}"
359            start = fmt.format(
360                timestamp, duration, self, I2CRecord.READ, status
361            )
362        else:
363            fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name}"
364            start = fmt.format(timestamp, duration, self, I2CRecord.READ)
365
366        timestamp = self.repeat.timestamp.strftime("%M:%S.%f")
367        duration = self.repeat.duration.total_seconds()
368        data = " ".join("{:02x}".format(v) for v in self.data)
369        data = "[ " + data + " ]"
370
371        status = []
372        if self.repeat.data.response != I2CResponse.NACK:
373            status.append(self.repeat.data.response.name)
374
375        if not self.validate_pec():
376            status.append("!PEC")
377
378        status = " ".join(status)
379        fmt = "{0} {1:.6f} {2} {3}"
380        repeat = fmt.format(timestamp, duration, data, status)
381
382        return start + " | " + repeat
383
384
385def filter_source(src):
386    for line in src:
387        if not line:
388            continue
389        if line.startswith("#"):
390            continue
391        if "Capture started" in line:
392            continue
393        if "Capture stopped" in line:
394            continue
395        yield line
396
397
398def main():
399    parser = argparse.ArgumentParser()
400    parser.add_argument("--after", type=str)
401    parser.add_argument("--before", type=str)
402    parser.add_argument("--longer-than", type=str)
403    parser.add_argument("--address", type=lambda x: int(x, 0))
404    parser.add_argument("--pmbus", action="store_true")
405    parser.add_argument("--ucd90320", action="store_true")
406    parser.add_argument("--bad-transfers", action="store_true")
407    parser.add_argument("file", type=str)
408    args = parser.parse_args()
409    with open(args.file, "r") as src:
410        data = (line for line in filter_source(src.readlines()))
411        xfers = (to_i2ctransfer(e) for e in csv.reader(data))
412        if args.after:
413            after = to_timestamp(args.after)
414            xfers = (e for e in xfers if e.timestamp > after)
415        if args.before:
416            before = to_timestamp(args.before)
417            xfers = (e for e in xfers if e.timestamp < before)
418        if args.longer_than:
419            minimum = to_duration(args.longer_than)
420            xfers = (e for e in xfers if e.duration > minimum)
421        if args.address is not None:
422            xfers = (e for e in xfers if e.address == args.address)
423        if args.ucd90320 or args.pmbus:
424            if args.ucd90320:
425                style = ucd90320command_style
426            else:
427                style = pmbuscommand_style
428            for xfer in as_smbustransfers(xfers, style):
429                if args.bad_transfers and xfer.valid():
430                    continue
431                print(xfer)
432        else:
433            for xfer in xfers:
434                timestamp = xfer.timestamp.strftime("%M:%S.%f")
435                duration = xfer.duration.total_seconds()
436                data = (
437                    "[ "
438                    + " ".join("{:02x}".format(v) for v in xfer.data.data)
439                    + " ]"
440                )
441                res = xfer.data.response.name
442                fmt = "{0} {1:.6f} 0x{2.address:x} {2.record.name} {3} {4}"
443                print(fmt.format(timestamp, duration, xfer, data, res))
444
445
446if __name__ == "__main__":
447    main()
448