xref: /openbmc/qemu/scripts/qmp_helper.py (revision 92a0dcbd751d771512b9dedd97e00553181b7699)
1#!/usr/bin/env python3
2#
3# pylint: disable=C0103,E0213,E1135,E1136,E1137,R0902,R0903,R0912,R0913,R0917
4# SPDX-License-Identifier: GPL-2.0-or-later
5#
6# Copyright (C) 2024-2025 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
7
8"""
9Helper classes to be used by ghes_inject command classes.
10"""
11
12import json
13import sys
14
15from datetime import datetime
16from os import path as os_path
17
18try:
19    qemu_dir = os_path.abspath(os_path.dirname(os_path.dirname(__file__)))
20    sys.path.append(os_path.join(qemu_dir, 'python'))
21
22    from qemu.qmp.legacy import QEMUMonitorProtocol
23
24except ModuleNotFoundError as exc:
25    print(f"Module '{exc.name}' not found.")
26    print("Try export PYTHONPATH=top-qemu-dir/python or run from top-qemu-dir")
27    sys.exit(1)
28
29from base64 import b64encode
30
31class util:
32    """
33    Ancillary functions to deal with bitmaps, parse arguments,
34    generate GUID and encode data on a bytearray buffer.
35    """
36
37    #
38    # Helper routines to handle multiple choice arguments
39    #
40    def get_choice(name, value, choices, suffixes=None, bitmask=True):
41        """Produce a list from multiple choice argument"""
42
43        new_values = 0
44
45        if not value:
46            return new_values
47
48        for val in value.split(","):
49            val = val.lower()
50
51            if suffixes:
52                for suffix in suffixes:
53                    val = val.removesuffix(suffix)
54
55            if val not in choices.keys():
56                if suffixes:
57                    for suffix in suffixes:
58                        if val + suffix in choices.keys():
59                            val += suffix
60                            break
61
62            if val not in choices.keys():
63                sys.exit(f"Error on '{name}': choice '{val}' is invalid.")
64
65            val = choices[val]
66
67            if bitmask:
68                new_values |= val
69            else:
70                if new_values:
71                    sys.exit(f"Error on '{name}': only one value is accepted.")
72
73                new_values = val
74
75        return new_values
76
77    def get_array(name, values, max_val=None):
78        """Add numbered hashes from integer lists into an array"""
79
80        array = []
81
82        for value in values:
83            for val in value.split(","):
84                try:
85                    val = int(val, 0)
86                except ValueError:
87                    sys.exit(f"Error on '{name}': {val} is not an integer")
88
89                if val < 0:
90                    sys.exit(f"Error on '{name}': {val} is not unsigned")
91
92                if max_val and val > max_val:
93                    sys.exit(f"Error on '{name}': {val} is too little")
94
95                array.append(val)
96
97        return array
98
99    def get_mult_array(mult, name, values, allow_zero=False, max_val=None):
100        """Add numbered hashes from integer lists"""
101
102        if not allow_zero:
103            if not values:
104                return
105        else:
106            if values is None:
107                return
108
109            if not values:
110                i = 0
111                if i not in mult:
112                    mult[i] = {}
113
114                mult[i][name] = []
115                return
116
117        i = 0
118        for value in values:
119            for val in value.split(","):
120                try:
121                    val = int(val, 0)
122                except ValueError:
123                    sys.exit(f"Error on '{name}': {val} is not an integer")
124
125                if val < 0:
126                    sys.exit(f"Error on '{name}': {val} is not unsigned")
127
128                if max_val and val > max_val:
129                    sys.exit(f"Error on '{name}': {val} is too little")
130
131                if i not in mult:
132                    mult[i] = {}
133
134                if name not in mult[i]:
135                    mult[i][name] = []
136
137                mult[i][name].append(val)
138
139            i += 1
140
141
142    def get_mult_choices(mult, name, values, choices,
143                        suffixes=None, allow_zero=False):
144        """Add numbered hashes from multiple choice arguments"""
145
146        if not allow_zero:
147            if not values:
148                return
149        else:
150            if values is None:
151                return
152
153        i = 0
154        for val in values:
155            new_values = util.get_choice(name, val, choices, suffixes)
156
157            if i not in mult:
158                mult[i] = {}
159
160            mult[i][name] = new_values
161            i += 1
162
163
164    def get_mult_int(mult, name, values, allow_zero=False):
165        """Add numbered hashes from integer arguments"""
166        if not allow_zero:
167            if not values:
168                return
169        else:
170            if values is None:
171                return
172
173        i = 0
174        for val in values:
175            try:
176                val = int(val, 0)
177            except ValueError:
178                sys.exit(f"Error on '{name}': {val} is not an integer")
179
180            if val < 0:
181                sys.exit(f"Error on '{name}': {val} is not unsigned")
182
183            if i not in mult:
184                mult[i] = {}
185
186            mult[i][name] = val
187            i += 1
188
189
190    #
191    # Data encode helper functions
192    #
193    def bit(b):
194        """Simple macro to define a bit on a bitmask"""
195        return 1 << b
196
197
198    def data_add(data, value, num_bytes):
199        """Adds bytes from value inside a bitarray"""
200
201        data.extend(value.to_bytes(num_bytes, byteorder="little"))  # pylint: disable=E1101
202
203    def dump_bytearray(name, data):
204        """Does an hexdump of a byte array, grouping in bytes"""
205
206        print(f"{name} ({len(data)} bytes):")
207
208        for ln_start in range(0, len(data), 16):
209            ln_end = min(ln_start + 16, len(data))
210            print(f"      {ln_start:08x}  ", end="")
211            for i in range(ln_start, ln_end):
212                print(f"{data[i]:02x} ", end="")
213            for i in range(ln_end, ln_start + 16):
214                print("   ", end="")
215            print("  ", end="")
216            for i in range(ln_start, ln_end):
217                if data[i] >= 32 and data[i] < 127:
218                    print(chr(data[i]), end="")
219                else:
220                    print(".", end="")
221
222            print()
223        print()
224
225    def time(string):
226        """Handle BCD timestamps used on Generic Error Data Block"""
227
228        time = None
229
230        # Formats to be used when parsing time stamps
231        formats = [
232            "%Y-%m-%d %H:%M:%S",
233        ]
234
235        if string == "now":
236            time = datetime.now()
237
238        if time is None:
239            for fmt in formats:
240                try:
241                    time = datetime.strptime(string, fmt)
242                    break
243                except ValueError:
244                    pass
245
246            if time is None:
247                raise ValueError("Invalid time format")
248
249        return time
250
251class guid:
252    """
253    Simple class to handle GUID fields.
254    """
255
256    def __init__(self, time_low, time_mid, time_high, nodes):
257        """Initialize a GUID value"""
258
259        assert len(nodes) == 8
260
261        self.time_low = time_low
262        self.time_mid = time_mid
263        self.time_high = time_high
264        self.nodes = nodes
265
266    @classmethod
267    def UUID(cls, guid_str):
268        """Initialize a GUID using a string on its standard format"""
269
270        if len(guid_str) != 36:
271            print("Size not 36")
272            raise ValueError('Invalid GUID size')
273
274        # It is easier to parse without separators. So, drop them
275        guid_str = guid_str.replace('-', '')
276
277        if len(guid_str) != 32:
278            print("Size not 32", guid_str, len(guid_str))
279            raise ValueError('Invalid GUID hex size')
280
281        time_low = 0
282        time_mid = 0
283        time_high = 0
284        nodes = []
285
286        for i in reversed(range(16, 32, 2)):
287            h = guid_str[i:i + 2]
288            value = int(h, 16)
289            nodes.insert(0, value)
290
291        time_high = int(guid_str[12:16], 16)
292        time_mid = int(guid_str[8:12], 16)
293        time_low = int(guid_str[0:8], 16)
294
295        return cls(time_low, time_mid, time_high, nodes)
296
297    def __str__(self):
298        """Output a GUID value on its default string representation"""
299
300        clock = self.nodes[0] << 8 | self.nodes[1]
301
302        node = 0
303        for i in range(2, len(self.nodes)):
304            node = node << 8 | self.nodes[i]
305
306        s = f"{self.time_low:08x}-{self.time_mid:04x}-"
307        s += f"{self.time_high:04x}-{clock:04x}-{node:012x}"
308        return s
309
310    def to_bytes(self):
311        """Output a GUID value in bytes"""
312
313        data = bytearray()
314
315        util.data_add(data, self.time_low, 4)
316        util.data_add(data, self.time_mid, 2)
317        util.data_add(data, self.time_high, 2)
318        data.extend(bytearray(self.nodes))
319
320        return data
321
322class qmp:
323    """
324    Opens a connection and send/receive QMP commands.
325    """
326
327    def send_cmd(self, command, args=None, may_open=False, return_error=True):
328        """Send a command to QMP, optinally opening a connection"""
329
330        if may_open:
331            self._connect()
332        elif not self.connected:
333            return False
334
335        msg = { 'execute': command }
336        if args:
337            msg['arguments'] = args
338
339        try:
340            obj = self.qmp_monitor.cmd_obj(msg)
341        # Can we use some other exception class here?
342        except Exception as e:                         # pylint: disable=W0718
343            print(f"Command: {command}")
344            print(f"Failed to inject error: {e}.")
345            return None
346
347        if "return" in obj:
348            if isinstance(obj.get("return"), dict):
349                if obj["return"]:
350                    return obj["return"]
351                return "OK"
352
353            return obj["return"]
354
355        if isinstance(obj.get("error"), dict):
356            error = obj["error"]
357            if return_error:
358                print(f"Command: {msg}")
359                print(f'{error["class"]}: {error["desc"]}')
360        else:
361            print(json.dumps(obj))
362
363        return None
364
365    def _close(self):
366        """Shutdown and close the socket, if opened"""
367        if not self.connected:
368            return
369
370        self.qmp_monitor.close()
371        self.connected = False
372
373    def _connect(self):
374        """Connect to a QMP TCP/IP port, if not connected yet"""
375
376        if self.connected:
377            return True
378
379        try:
380            self.qmp_monitor.connect(negotiate=True)
381        except ConnectionError:
382            sys.exit(f"Can't connect to QMP host {self.host}:{self.port}")
383
384        self.connected = True
385
386        return True
387
388    BLOCK_STATUS_BITS = {
389        "uncorrectable":            util.bit(0),
390        "correctable":              util.bit(1),
391        "multi-uncorrectable":      util.bit(2),
392        "multi-correctable":        util.bit(3),
393    }
394
395    ERROR_SEVERITY = {
396        "recoverable":  0,
397        "fatal":        1,
398        "corrected":    2,
399        "none":         3,
400    }
401
402    VALIDATION_BITS = {
403        "fru-id":       util.bit(0),
404        "fru-text":     util.bit(1),
405        "timestamp":    util.bit(2),
406    }
407
408    GEDB_FLAGS_BITS = {
409        "recovered":    util.bit(0),
410        "prev-error":   util.bit(1),
411        "simulated":    util.bit(2),
412    }
413
414    GENERIC_DATA_SIZE = 72
415
416    def argparse(parser):
417        """Prepare a parser group to query generic error data"""
418
419        block_status_bits = ",".join(qmp.BLOCK_STATUS_BITS.keys())
420        error_severity_enum = ",".join(qmp.ERROR_SEVERITY.keys())
421        validation_bits = ",".join(qmp.VALIDATION_BITS.keys())
422        gedb_flags_bits = ",".join(qmp.GEDB_FLAGS_BITS.keys())
423
424        g_gen = parser.add_argument_group("Generic Error Data")  # pylint: disable=E1101
425        g_gen.add_argument("--block-status",
426                           help=f"block status bits: {block_status_bits}")
427        g_gen.add_argument("--raw-data", nargs="+",
428                        help="Raw data inside the Error Status Block")
429        g_gen.add_argument("--error-severity", "--severity",
430                           help=f"error severity: {error_severity_enum}")
431        g_gen.add_argument("--gen-err-valid-bits",
432                           "--generic-error-validation-bits",
433                           help=f"validation bits: {validation_bits}")
434        g_gen.add_argument("--fru-id", type=guid.UUID,
435                           help="GUID representing a physical device")
436        g_gen.add_argument("--fru-text",
437                           help="ASCII string identifying the FRU hardware")
438        g_gen.add_argument("--timestamp", type=util.time,
439                           help="Time when the error info was collected")
440        g_gen.add_argument("--precise", "--precise-timestamp",
441                           action='store_true',
442                           help="Marks the timestamp as precise if --timestamp is used")
443        g_gen.add_argument("--gedb-flags",
444                           help=f"General Error Data Block flags: {gedb_flags_bits}")
445
446    def set_args(self, args):
447        """Set the arguments optionally defined via self.argparse()"""
448
449        if args.block_status:
450            self.block_status = util.get_choice(name="block-status",
451                                                value=args.block_status,
452                                                choices=self.BLOCK_STATUS_BITS,
453                                                bitmask=False)
454        if args.raw_data:
455            self.raw_data = util.get_array("raw-data", args.raw_data,
456                                           max_val=255)
457            print(self.raw_data)
458
459        if args.error_severity:
460            self.error_severity = util.get_choice(name="error-severity",
461                                                  value=args.error_severity,
462                                                  choices=self.ERROR_SEVERITY,
463                                                  bitmask=False)
464
465        if args.fru_id:
466            self.fru_id = args.fru_id.to_bytes()
467            if not args.gen_err_valid_bits:
468                self.validation_bits |= self.VALIDATION_BITS["fru-id"]
469
470        if args.fru_text:
471            text = bytearray(args.fru_text.encode('ascii'))
472            if len(text) > 20:
473                sys.exit("FRU text is too big to fit")
474
475            self.fru_text = text
476            if not args.gen_err_valid_bits:
477                self.validation_bits |= self.VALIDATION_BITS["fru-text"]
478
479        if args.timestamp:
480            time = args.timestamp
481            century = int(time.year / 100)
482
483            bcd = bytearray()
484            util.data_add(bcd, (time.second // 10) << 4 | (time.second % 10), 1)
485            util.data_add(bcd, (time.minute // 10) << 4 | (time.minute % 10), 1)
486            util.data_add(bcd, (time.hour // 10) << 4 | (time.hour % 10), 1)
487
488            if args.precise:
489                util.data_add(bcd, 1, 1)
490            else:
491                util.data_add(bcd, 0, 1)
492
493            util.data_add(bcd, (time.day // 10) << 4 | (time.day % 10), 1)
494            util.data_add(bcd, (time.month // 10) << 4 | (time.month % 10), 1)
495            util.data_add(bcd,
496                          ((time.year % 100) // 10) << 4 | (time.year % 10), 1)
497            util.data_add(bcd, ((century % 100) // 10) << 4 | (century % 10), 1)
498
499            self.timestamp = bcd
500            if not args.gen_err_valid_bits:
501                self.validation_bits |= self.VALIDATION_BITS["timestamp"]
502
503        if args.gen_err_valid_bits:
504            self.validation_bits = util.get_choice(name="validation",
505                                                   value=args.gen_err_valid_bits,
506                                                   choices=self.VALIDATION_BITS)
507
508    def __init__(self, host, port, debug=False):
509        """Initialize variables used by the QMP send logic"""
510
511        self.connected = False
512        self.host = host
513        self.port = port
514        self.debug = debug
515
516        # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Status Block
517        self.block_status = self.BLOCK_STATUS_BITS["uncorrectable"]
518        self.raw_data = []
519        self.error_severity = self.ERROR_SEVERITY["recoverable"]
520
521        # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Data Entry
522        self.validation_bits = 0
523        self.flags = 0
524        self.fru_id = bytearray(16)
525        self.fru_text = bytearray(20)
526        self.timestamp = bytearray(8)
527
528        self.qmp_monitor = QEMUMonitorProtocol(address=(self.host, self.port))
529
530    #
531    # Socket QMP send command
532    #
533    def send_cper_raw(self, cper_data):
534        """Send a raw CPER data to QEMU though QMP TCP socket"""
535
536        data = b64encode(bytes(cper_data)).decode('ascii')
537
538        cmd_arg = {
539            'cper': data
540        }
541
542        self._connect()
543
544        if self.send_cmd("inject-ghes-v2-error", cmd_arg):
545            print("Error injected.")
546
547    def send_cper(self, notif_type, payload):
548        """Send commands to QEMU though QMP TCP socket"""
549
550        # Fill CPER record header
551
552        # NOTE: bits 4 to 13 of block status contain the number of
553        # data entries in the data section. This is currently unsupported.
554
555        cper_length = len(payload)
556        data_length = cper_length + len(self.raw_data) + self.GENERIC_DATA_SIZE
557
558        #  Generic Error Data Entry
559        gede = bytearray()
560
561        gede.extend(notif_type.to_bytes())
562        util.data_add(gede, self.error_severity, 4)
563        util.data_add(gede, 0x300, 2)
564        util.data_add(gede, self.validation_bits, 1)
565        util.data_add(gede, self.flags, 1)
566        util.data_add(gede, cper_length, 4)
567        gede.extend(self.fru_id)
568        gede.extend(self.fru_text)
569        gede.extend(self.timestamp)
570
571        # Generic Error Status Block
572        gebs = bytearray()
573
574        if self.raw_data:
575            raw_data_offset = len(gebs)
576        else:
577            raw_data_offset = 0
578
579        util.data_add(gebs, self.block_status, 4)
580        util.data_add(gebs, raw_data_offset, 4)
581        util.data_add(gebs, len(self.raw_data), 4)
582        util.data_add(gebs, data_length, 4)
583        util.data_add(gebs, self.error_severity, 4)
584
585        cper_data = bytearray()
586        cper_data.extend(gebs)
587        cper_data.extend(gede)
588        cper_data.extend(bytearray(self.raw_data))
589        cper_data.extend(bytearray(payload))
590
591        if self.debug:
592            print(f"GUID: {notif_type}")
593
594            util.dump_bytearray("Generic Error Status Block", gebs)
595            util.dump_bytearray("Generic Error Data Entry", gede)
596
597            if self.raw_data:
598                util.dump_bytearray("Raw data", bytearray(self.raw_data))
599
600            util.dump_bytearray("Payload", payload)
601
602        self.send_cper_raw(cper_data)
603
604
605    def search_qom(self, path, prop, regex):
606        """
607        Return a list of devices that match path array like:
608
609            /machine/unattached/device
610            /machine/peripheral-anon/device
611            ...
612        """
613
614        found = []
615
616        i = 0
617        while 1:
618            dev = f"{path}[{i}]"
619            args = {
620                'path': dev,
621                'property': prop
622            }
623            ret = self.send_cmd("qom-get", args, may_open=True,
624                                return_error=False)
625            if not ret:
626                break
627
628            if isinstance(ret, str):
629                if regex.search(ret):
630                    found.append(dev)
631
632            i += 1
633            if i > 10000:
634                print("Too many objects returned by qom-get!")
635                break
636
637        return found
638
639class cper_guid:
640    """
641    Contains CPER GUID, as per:
642    https://uefi.org/specs/UEFI/2.10/Apx_N_Common_Platform_Error_Record.html
643    """
644
645    CPER_PROC_GENERIC =  guid(0x9876CCAD, 0x47B4, 0x4bdb,
646                              [0xB6, 0x5E, 0x16, 0xF1,
647                               0x93, 0xC4, 0xF3, 0xDB])
648
649    CPER_PROC_X86 = guid(0xDC3EA0B0, 0xA144, 0x4797,
650                         [0xB9, 0x5B, 0x53, 0xFA,
651                          0x24, 0x2B, 0x6E, 0x1D])
652
653    CPER_PROC_ITANIUM = guid(0xe429faf1, 0x3cb7, 0x11d4,
654                             [0xbc, 0xa7, 0x00, 0x80,
655                              0xc7, 0x3c, 0x88, 0x81])
656
657    CPER_PROC_ARM = guid(0xE19E3D16, 0xBC11, 0x11E4,
658                         [0x9C, 0xAA, 0xC2, 0x05,
659                          0x1D, 0x5D, 0x46, 0xB0])
660
661    CPER_PLATFORM_MEM = guid(0xA5BC1114, 0x6F64, 0x4EDE,
662                             [0xB8, 0x63, 0x3E, 0x83,
663                              0xED, 0x7C, 0x83, 0xB1])
664
665    CPER_PLATFORM_MEM2 = guid(0x61EC04FC, 0x48E6, 0xD813,
666                              [0x25, 0xC9, 0x8D, 0xAA,
667                               0x44, 0x75, 0x0B, 0x12])
668
669    CPER_PCIE = guid(0xD995E954, 0xBBC1, 0x430F,
670                     [0xAD, 0x91, 0xB4, 0x4D,
671                      0xCB, 0x3C, 0x6F, 0x35])
672
673    CPER_PCI_BUS = guid(0xC5753963, 0x3B84, 0x4095,
674                        [0xBF, 0x78, 0xED, 0xDA,
675                         0xD3, 0xF9, 0xC9, 0xDD])
676
677    CPER_PCI_DEV = guid(0xEB5E4685, 0xCA66, 0x4769,
678                        [0xB6, 0xA2, 0x26, 0x06,
679                         0x8B, 0x00, 0x13, 0x26])
680
681    CPER_FW_ERROR = guid(0x81212A96, 0x09ED, 0x4996,
682                         [0x94, 0x71, 0x8D, 0x72,
683                          0x9C, 0x8E, 0x69, 0xED])
684
685    CPER_DMA_GENERIC = guid(0x5B51FEF7, 0xC79D, 0x4434,
686                            [0x8F, 0x1B, 0xAA, 0x62,
687                             0xDE, 0x3E, 0x2C, 0x64])
688
689    CPER_DMA_VT = guid(0x71761D37, 0x32B2, 0x45cd,
690                       [0xA7, 0xD0, 0xB0, 0xFE,
691                        0xDD, 0x93, 0xE8, 0xCF])
692
693    CPER_DMA_IOMMU = guid(0x036F84E1, 0x7F37, 0x428c,
694                         [0xA7, 0x9E, 0x57, 0x5F,
695                          0xDF, 0xAA, 0x84, 0xEC])
696
697    CPER_CCIX_PER = guid(0x91335EF6, 0xEBFB, 0x4478,
698                         [0xA6, 0xA6, 0x88, 0xB7,
699                          0x28, 0xCF, 0x75, 0xD7])
700
701    CPER_CXL_PROT_ERR = guid(0x80B9EFB4, 0x52B5, 0x4DE3,
702                             [0xA7, 0x77, 0x68, 0x78,
703                              0x4B, 0x77, 0x10, 0x48])
704