1#!/usr/bin/env python3 2# 3# pylint: disable=C0103,E0213,E1135,E1136,E1137,R0902,R0903,R0912,R0913,R0917 4# SPDX-License-Identifier: GPL-2.0-or-later 5# 6# Copyright (C) 2024-2025 Mauro Carvalho Chehab <mchehab+huawei@kernel.org> 7 8""" 9Helper classes to be used by ghes_inject command classes. 10""" 11 12import json 13import sys 14 15from datetime import datetime 16from os import path as os_path 17 18try: 19 qemu_dir = os_path.abspath(os_path.dirname(os_path.dirname(__file__))) 20 sys.path.append(os_path.join(qemu_dir, 'python')) 21 22 from qemu.qmp.legacy import QEMUMonitorProtocol 23 24except ModuleNotFoundError as exc: 25 print(f"Module '{exc.name}' not found.") 26 print("Try export PYTHONPATH=top-qemu-dir/python or run from top-qemu-dir") 27 sys.exit(1) 28 29from base64 import b64encode 30 31class util: 32 """ 33 Ancillary functions to deal with bitmaps, parse arguments, 34 generate GUID and encode data on a bytearray buffer. 35 """ 36 37 # 38 # Helper routines to handle multiple choice arguments 39 # 40 def get_choice(name, value, choices, suffixes=None, bitmask=True): 41 """Produce a list from multiple choice argument""" 42 43 new_values = 0 44 45 if not value: 46 return new_values 47 48 for val in value.split(","): 49 val = val.lower() 50 51 if suffixes: 52 for suffix in suffixes: 53 val = val.removesuffix(suffix) 54 55 if val not in choices.keys(): 56 if suffixes: 57 for suffix in suffixes: 58 if val + suffix in choices.keys(): 59 val += suffix 60 break 61 62 if val not in choices.keys(): 63 sys.exit(f"Error on '{name}': choice '{val}' is invalid.") 64 65 val = choices[val] 66 67 if bitmask: 68 new_values |= val 69 else: 70 if new_values: 71 sys.exit(f"Error on '{name}': only one value is accepted.") 72 73 new_values = val 74 75 return new_values 76 77 def get_array(name, values, max_val=None): 78 """Add numbered hashes from integer lists into an array""" 79 80 array = [] 81 82 for value in values: 83 for val in value.split(","): 84 try: 85 val = int(val, 0) 86 except ValueError: 87 sys.exit(f"Error on '{name}': {val} is not an integer") 88 89 if val < 0: 90 sys.exit(f"Error on '{name}': {val} is not unsigned") 91 92 if max_val and val > max_val: 93 sys.exit(f"Error on '{name}': {val} is too little") 94 95 array.append(val) 96 97 return array 98 99 def get_mult_array(mult, name, values, allow_zero=False, max_val=None): 100 """Add numbered hashes from integer lists""" 101 102 if not allow_zero: 103 if not values: 104 return 105 else: 106 if values is None: 107 return 108 109 if not values: 110 i = 0 111 if i not in mult: 112 mult[i] = {} 113 114 mult[i][name] = [] 115 return 116 117 i = 0 118 for value in values: 119 for val in value.split(","): 120 try: 121 val = int(val, 0) 122 except ValueError: 123 sys.exit(f"Error on '{name}': {val} is not an integer") 124 125 if val < 0: 126 sys.exit(f"Error on '{name}': {val} is not unsigned") 127 128 if max_val and val > max_val: 129 sys.exit(f"Error on '{name}': {val} is too little") 130 131 if i not in mult: 132 mult[i] = {} 133 134 if name not in mult[i]: 135 mult[i][name] = [] 136 137 mult[i][name].append(val) 138 139 i += 1 140 141 142 def get_mult_choices(mult, name, values, choices, 143 suffixes=None, allow_zero=False): 144 """Add numbered hashes from multiple choice arguments""" 145 146 if not allow_zero: 147 if not values: 148 return 149 else: 150 if values is None: 151 return 152 153 i = 0 154 for val in values: 155 new_values = util.get_choice(name, val, choices, suffixes) 156 157 if i not in mult: 158 mult[i] = {} 159 160 mult[i][name] = new_values 161 i += 1 162 163 164 def get_mult_int(mult, name, values, allow_zero=False): 165 """Add numbered hashes from integer arguments""" 166 if not allow_zero: 167 if not values: 168 return 169 else: 170 if values is None: 171 return 172 173 i = 0 174 for val in values: 175 try: 176 val = int(val, 0) 177 except ValueError: 178 sys.exit(f"Error on '{name}': {val} is not an integer") 179 180 if val < 0: 181 sys.exit(f"Error on '{name}': {val} is not unsigned") 182 183 if i not in mult: 184 mult[i] = {} 185 186 mult[i][name] = val 187 i += 1 188 189 190 # 191 # Data encode helper functions 192 # 193 def bit(b): 194 """Simple macro to define a bit on a bitmask""" 195 return 1 << b 196 197 198 def data_add(data, value, num_bytes): 199 """Adds bytes from value inside a bitarray""" 200 201 data.extend(value.to_bytes(num_bytes, byteorder="little")) # pylint: disable=E1101 202 203 def dump_bytearray(name, data): 204 """Does an hexdump of a byte array, grouping in bytes""" 205 206 print(f"{name} ({len(data)} bytes):") 207 208 for ln_start in range(0, len(data), 16): 209 ln_end = min(ln_start + 16, len(data)) 210 print(f" {ln_start:08x} ", end="") 211 for i in range(ln_start, ln_end): 212 print(f"{data[i]:02x} ", end="") 213 for i in range(ln_end, ln_start + 16): 214 print(" ", end="") 215 print(" ", end="") 216 for i in range(ln_start, ln_end): 217 if data[i] >= 32 and data[i] < 127: 218 print(chr(data[i]), end="") 219 else: 220 print(".", end="") 221 222 print() 223 print() 224 225 def time(string): 226 """Handle BCD timestamps used on Generic Error Data Block""" 227 228 time = None 229 230 # Formats to be used when parsing time stamps 231 formats = [ 232 "%Y-%m-%d %H:%M:%S", 233 ] 234 235 if string == "now": 236 time = datetime.now() 237 238 if time is None: 239 for fmt in formats: 240 try: 241 time = datetime.strptime(string, fmt) 242 break 243 except ValueError: 244 pass 245 246 if time is None: 247 raise ValueError("Invalid time format") 248 249 return time 250 251class guid: 252 """ 253 Simple class to handle GUID fields. 254 """ 255 256 def __init__(self, time_low, time_mid, time_high, nodes): 257 """Initialize a GUID value""" 258 259 assert len(nodes) == 8 260 261 self.time_low = time_low 262 self.time_mid = time_mid 263 self.time_high = time_high 264 self.nodes = nodes 265 266 @classmethod 267 def UUID(cls, guid_str): 268 """Initialize a GUID using a string on its standard format""" 269 270 if len(guid_str) != 36: 271 print("Size not 36") 272 raise ValueError('Invalid GUID size') 273 274 # It is easier to parse without separators. So, drop them 275 guid_str = guid_str.replace('-', '') 276 277 if len(guid_str) != 32: 278 print("Size not 32", guid_str, len(guid_str)) 279 raise ValueError('Invalid GUID hex size') 280 281 time_low = 0 282 time_mid = 0 283 time_high = 0 284 nodes = [] 285 286 for i in reversed(range(16, 32, 2)): 287 h = guid_str[i:i + 2] 288 value = int(h, 16) 289 nodes.insert(0, value) 290 291 time_high = int(guid_str[12:16], 16) 292 time_mid = int(guid_str[8:12], 16) 293 time_low = int(guid_str[0:8], 16) 294 295 return cls(time_low, time_mid, time_high, nodes) 296 297 def __str__(self): 298 """Output a GUID value on its default string representation""" 299 300 clock = self.nodes[0] << 8 | self.nodes[1] 301 302 node = 0 303 for i in range(2, len(self.nodes)): 304 node = node << 8 | self.nodes[i] 305 306 s = f"{self.time_low:08x}-{self.time_mid:04x}-" 307 s += f"{self.time_high:04x}-{clock:04x}-{node:012x}" 308 return s 309 310 def to_bytes(self): 311 """Output a GUID value in bytes""" 312 313 data = bytearray() 314 315 util.data_add(data, self.time_low, 4) 316 util.data_add(data, self.time_mid, 2) 317 util.data_add(data, self.time_high, 2) 318 data.extend(bytearray(self.nodes)) 319 320 return data 321 322class qmp: 323 """ 324 Opens a connection and send/receive QMP commands. 325 """ 326 327 def send_cmd(self, command, args=None, may_open=False, return_error=True): 328 """Send a command to QMP, optinally opening a connection""" 329 330 if may_open: 331 self._connect() 332 elif not self.connected: 333 return False 334 335 msg = { 'execute': command } 336 if args: 337 msg['arguments'] = args 338 339 try: 340 obj = self.qmp_monitor.cmd_obj(msg) 341 # Can we use some other exception class here? 342 except Exception as e: # pylint: disable=W0718 343 print(f"Command: {command}") 344 print(f"Failed to inject error: {e}.") 345 return None 346 347 if "return" in obj: 348 if isinstance(obj.get("return"), dict): 349 if obj["return"]: 350 return obj["return"] 351 return "OK" 352 353 return obj["return"] 354 355 if isinstance(obj.get("error"), dict): 356 error = obj["error"] 357 if return_error: 358 print(f"Command: {msg}") 359 print(f'{error["class"]}: {error["desc"]}') 360 else: 361 print(json.dumps(obj)) 362 363 return None 364 365 def _close(self): 366 """Shutdown and close the socket, if opened""" 367 if not self.connected: 368 return 369 370 self.qmp_monitor.close() 371 self.connected = False 372 373 def _connect(self): 374 """Connect to a QMP TCP/IP port, if not connected yet""" 375 376 if self.connected: 377 return True 378 379 try: 380 self.qmp_monitor.connect(negotiate=True) 381 except ConnectionError: 382 sys.exit(f"Can't connect to QMP host {self.host}:{self.port}") 383 384 self.connected = True 385 386 return True 387 388 BLOCK_STATUS_BITS = { 389 "uncorrectable": util.bit(0), 390 "correctable": util.bit(1), 391 "multi-uncorrectable": util.bit(2), 392 "multi-correctable": util.bit(3), 393 } 394 395 ERROR_SEVERITY = { 396 "recoverable": 0, 397 "fatal": 1, 398 "corrected": 2, 399 "none": 3, 400 } 401 402 VALIDATION_BITS = { 403 "fru-id": util.bit(0), 404 "fru-text": util.bit(1), 405 "timestamp": util.bit(2), 406 } 407 408 GEDB_FLAGS_BITS = { 409 "recovered": util.bit(0), 410 "prev-error": util.bit(1), 411 "simulated": util.bit(2), 412 } 413 414 GENERIC_DATA_SIZE = 72 415 416 def argparse(parser): 417 """Prepare a parser group to query generic error data""" 418 419 block_status_bits = ",".join(qmp.BLOCK_STATUS_BITS.keys()) 420 error_severity_enum = ",".join(qmp.ERROR_SEVERITY.keys()) 421 validation_bits = ",".join(qmp.VALIDATION_BITS.keys()) 422 gedb_flags_bits = ",".join(qmp.GEDB_FLAGS_BITS.keys()) 423 424 g_gen = parser.add_argument_group("Generic Error Data") # pylint: disable=E1101 425 g_gen.add_argument("--block-status", 426 help=f"block status bits: {block_status_bits}") 427 g_gen.add_argument("--raw-data", nargs="+", 428 help="Raw data inside the Error Status Block") 429 g_gen.add_argument("--error-severity", "--severity", 430 help=f"error severity: {error_severity_enum}") 431 g_gen.add_argument("--gen-err-valid-bits", 432 "--generic-error-validation-bits", 433 help=f"validation bits: {validation_bits}") 434 g_gen.add_argument("--fru-id", type=guid.UUID, 435 help="GUID representing a physical device") 436 g_gen.add_argument("--fru-text", 437 help="ASCII string identifying the FRU hardware") 438 g_gen.add_argument("--timestamp", type=util.time, 439 help="Time when the error info was collected") 440 g_gen.add_argument("--precise", "--precise-timestamp", 441 action='store_true', 442 help="Marks the timestamp as precise if --timestamp is used") 443 g_gen.add_argument("--gedb-flags", 444 help=f"General Error Data Block flags: {gedb_flags_bits}") 445 446 def set_args(self, args): 447 """Set the arguments optionally defined via self.argparse()""" 448 449 if args.block_status: 450 self.block_status = util.get_choice(name="block-status", 451 value=args.block_status, 452 choices=self.BLOCK_STATUS_BITS, 453 bitmask=False) 454 if args.raw_data: 455 self.raw_data = util.get_array("raw-data", args.raw_data, 456 max_val=255) 457 print(self.raw_data) 458 459 if args.error_severity: 460 self.error_severity = util.get_choice(name="error-severity", 461 value=args.error_severity, 462 choices=self.ERROR_SEVERITY, 463 bitmask=False) 464 465 if args.fru_id: 466 self.fru_id = args.fru_id.to_bytes() 467 if not args.gen_err_valid_bits: 468 self.validation_bits |= self.VALIDATION_BITS["fru-id"] 469 470 if args.fru_text: 471 text = bytearray(args.fru_text.encode('ascii')) 472 if len(text) > 20: 473 sys.exit("FRU text is too big to fit") 474 475 self.fru_text = text 476 if not args.gen_err_valid_bits: 477 self.validation_bits |= self.VALIDATION_BITS["fru-text"] 478 479 if args.timestamp: 480 time = args.timestamp 481 century = int(time.year / 100) 482 483 bcd = bytearray() 484 util.data_add(bcd, (time.second // 10) << 4 | (time.second % 10), 1) 485 util.data_add(bcd, (time.minute // 10) << 4 | (time.minute % 10), 1) 486 util.data_add(bcd, (time.hour // 10) << 4 | (time.hour % 10), 1) 487 488 if args.precise: 489 util.data_add(bcd, 1, 1) 490 else: 491 util.data_add(bcd, 0, 1) 492 493 util.data_add(bcd, (time.day // 10) << 4 | (time.day % 10), 1) 494 util.data_add(bcd, (time.month // 10) << 4 | (time.month % 10), 1) 495 util.data_add(bcd, 496 ((time.year % 100) // 10) << 4 | (time.year % 10), 1) 497 util.data_add(bcd, ((century % 100) // 10) << 4 | (century % 10), 1) 498 499 self.timestamp = bcd 500 if not args.gen_err_valid_bits: 501 self.validation_bits |= self.VALIDATION_BITS["timestamp"] 502 503 if args.gen_err_valid_bits: 504 self.validation_bits = util.get_choice(name="validation", 505 value=args.gen_err_valid_bits, 506 choices=self.VALIDATION_BITS) 507 508 def __init__(self, host, port, debug=False): 509 """Initialize variables used by the QMP send logic""" 510 511 self.connected = False 512 self.host = host 513 self.port = port 514 self.debug = debug 515 516 # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Status Block 517 self.block_status = self.BLOCK_STATUS_BITS["uncorrectable"] 518 self.raw_data = [] 519 self.error_severity = self.ERROR_SEVERITY["recoverable"] 520 521 # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Data Entry 522 self.validation_bits = 0 523 self.flags = 0 524 self.fru_id = bytearray(16) 525 self.fru_text = bytearray(20) 526 self.timestamp = bytearray(8) 527 528 self.qmp_monitor = QEMUMonitorProtocol(address=(self.host, self.port)) 529 530 # 531 # Socket QMP send command 532 # 533 def send_cper_raw(self, cper_data): 534 """Send a raw CPER data to QEMU though QMP TCP socket""" 535 536 data = b64encode(bytes(cper_data)).decode('ascii') 537 538 cmd_arg = { 539 'cper': data 540 } 541 542 self._connect() 543 544 if self.send_cmd("inject-ghes-v2-error", cmd_arg): 545 print("Error injected.") 546 547 def send_cper(self, notif_type, payload): 548 """Send commands to QEMU though QMP TCP socket""" 549 550 # Fill CPER record header 551 552 # NOTE: bits 4 to 13 of block status contain the number of 553 # data entries in the data section. This is currently unsupported. 554 555 cper_length = len(payload) 556 data_length = cper_length + len(self.raw_data) + self.GENERIC_DATA_SIZE 557 558 # Generic Error Data Entry 559 gede = bytearray() 560 561 gede.extend(notif_type.to_bytes()) 562 util.data_add(gede, self.error_severity, 4) 563 util.data_add(gede, 0x300, 2) 564 util.data_add(gede, self.validation_bits, 1) 565 util.data_add(gede, self.flags, 1) 566 util.data_add(gede, cper_length, 4) 567 gede.extend(self.fru_id) 568 gede.extend(self.fru_text) 569 gede.extend(self.timestamp) 570 571 # Generic Error Status Block 572 gebs = bytearray() 573 574 if self.raw_data: 575 raw_data_offset = len(gebs) 576 else: 577 raw_data_offset = 0 578 579 util.data_add(gebs, self.block_status, 4) 580 util.data_add(gebs, raw_data_offset, 4) 581 util.data_add(gebs, len(self.raw_data), 4) 582 util.data_add(gebs, data_length, 4) 583 util.data_add(gebs, self.error_severity, 4) 584 585 cper_data = bytearray() 586 cper_data.extend(gebs) 587 cper_data.extend(gede) 588 cper_data.extend(bytearray(self.raw_data)) 589 cper_data.extend(bytearray(payload)) 590 591 if self.debug: 592 print(f"GUID: {notif_type}") 593 594 util.dump_bytearray("Generic Error Status Block", gebs) 595 util.dump_bytearray("Generic Error Data Entry", gede) 596 597 if self.raw_data: 598 util.dump_bytearray("Raw data", bytearray(self.raw_data)) 599 600 util.dump_bytearray("Payload", payload) 601 602 self.send_cper_raw(cper_data) 603 604 605 def search_qom(self, path, prop, regex): 606 """ 607 Return a list of devices that match path array like: 608 609 /machine/unattached/device 610 /machine/peripheral-anon/device 611 ... 612 """ 613 614 found = [] 615 616 i = 0 617 while 1: 618 dev = f"{path}[{i}]" 619 args = { 620 'path': dev, 621 'property': prop 622 } 623 ret = self.send_cmd("qom-get", args, may_open=True, 624 return_error=False) 625 if not ret: 626 break 627 628 if isinstance(ret, str): 629 if regex.search(ret): 630 found.append(dev) 631 632 i += 1 633 if i > 10000: 634 print("Too many objects returned by qom-get!") 635 break 636 637 return found 638 639class cper_guid: 640 """ 641 Contains CPER GUID, as per: 642 https://uefi.org/specs/UEFI/2.10/Apx_N_Common_Platform_Error_Record.html 643 """ 644 645 CPER_PROC_GENERIC = guid(0x9876CCAD, 0x47B4, 0x4bdb, 646 [0xB6, 0x5E, 0x16, 0xF1, 647 0x93, 0xC4, 0xF3, 0xDB]) 648 649 CPER_PROC_X86 = guid(0xDC3EA0B0, 0xA144, 0x4797, 650 [0xB9, 0x5B, 0x53, 0xFA, 651 0x24, 0x2B, 0x6E, 0x1D]) 652 653 CPER_PROC_ITANIUM = guid(0xe429faf1, 0x3cb7, 0x11d4, 654 [0xbc, 0xa7, 0x00, 0x80, 655 0xc7, 0x3c, 0x88, 0x81]) 656 657 CPER_PROC_ARM = guid(0xE19E3D16, 0xBC11, 0x11E4, 658 [0x9C, 0xAA, 0xC2, 0x05, 659 0x1D, 0x5D, 0x46, 0xB0]) 660 661 CPER_PLATFORM_MEM = guid(0xA5BC1114, 0x6F64, 0x4EDE, 662 [0xB8, 0x63, 0x3E, 0x83, 663 0xED, 0x7C, 0x83, 0xB1]) 664 665 CPER_PLATFORM_MEM2 = guid(0x61EC04FC, 0x48E6, 0xD813, 666 [0x25, 0xC9, 0x8D, 0xAA, 667 0x44, 0x75, 0x0B, 0x12]) 668 669 CPER_PCIE = guid(0xD995E954, 0xBBC1, 0x430F, 670 [0xAD, 0x91, 0xB4, 0x4D, 671 0xCB, 0x3C, 0x6F, 0x35]) 672 673 CPER_PCI_BUS = guid(0xC5753963, 0x3B84, 0x4095, 674 [0xBF, 0x78, 0xED, 0xDA, 675 0xD3, 0xF9, 0xC9, 0xDD]) 676 677 CPER_PCI_DEV = guid(0xEB5E4685, 0xCA66, 0x4769, 678 [0xB6, 0xA2, 0x26, 0x06, 679 0x8B, 0x00, 0x13, 0x26]) 680 681 CPER_FW_ERROR = guid(0x81212A96, 0x09ED, 0x4996, 682 [0x94, 0x71, 0x8D, 0x72, 683 0x9C, 0x8E, 0x69, 0xED]) 684 685 CPER_DMA_GENERIC = guid(0x5B51FEF7, 0xC79D, 0x4434, 686 [0x8F, 0x1B, 0xAA, 0x62, 687 0xDE, 0x3E, 0x2C, 0x64]) 688 689 CPER_DMA_VT = guid(0x71761D37, 0x32B2, 0x45cd, 690 [0xA7, 0xD0, 0xB0, 0xFE, 691 0xDD, 0x93, 0xE8, 0xCF]) 692 693 CPER_DMA_IOMMU = guid(0x036F84E1, 0x7F37, 0x428c, 694 [0xA7, 0x9E, 0x57, 0x5F, 695 0xDF, 0xAA, 0x84, 0xEC]) 696 697 CPER_CCIX_PER = guid(0x91335EF6, 0xEBFB, 0x4478, 698 [0xA6, 0xA6, 0x88, 0xB7, 699 0x28, 0xCF, 0x75, 0xD7]) 700 701 CPER_CXL_PROT_ERR = guid(0x80B9EFB4, 0x52B5, 0x4DE3, 702 [0xA7, 0x77, 0x68, 0x78, 703 0x4B, 0x77, 0x10, 0x48]) 704