xref: /openbmc/bmcweb/scripts/websocket_test.py (revision cb103130)
1#!/usr/bin/env python3
2
3# Example of streaming sensor values from openbmc using the /subscribe api
4# requires websockets package to be installed
5
6import asyncio
7import ssl
8import websockets
9import base64
10import json
11import argparse
12
13parser = argparse.ArgumentParser()
14parser.add_argument("--host", help="Host to connect to", required=True)
15parser.add_argument(
16    "--username", help="Username to connect with", default="root")
17parser.add_argument("--password", help="Password to use", default="0penBmc")
18
19args = parser.parse_args()
20
21sensor_type_map = {
22    "voltage": "Volts",
23    "power": "Watts",
24    "fan": "RPM",
25    "fan_tach": "RPM",
26    "temperature": "Degrees C",
27    "altitude": "Meters",
28    "current": "Amps",
29    "energy": "Joules",
30    "cfm": "CFM"
31}
32
33
34async def hello():
35    uri = 'wss://{}/subscribe'.format(args.host)
36    ssl_context = ssl.SSLContext()
37    authbytes = "{}:{}".format(args.username, args.password).encode('ascii')
38    auth = "Basic {}".format(base64.b64encode(authbytes).decode('ascii'))
39    headers = {"Authorization": auth}
40    async with websockets.connect(uri, ssl=ssl_context, extra_headers=headers) as websocket:
41        request = json.dumps({
42            "paths": ["/xyz/openbmc_project/sensors"],
43            "interfaces": ["xyz.openbmc_project.Sensor.Value"]
44        })
45        await websocket.send(request)
46
47        while True:
48            payload = await websocket.recv()
49            j = json.loads(payload)
50            path = j.get("path", "unknown/unknown")
51            name = path.split("/")[-1]
52            sensor_type = path.split("/")[-2]
53            units = sensor_type_map.get(sensor_type, "")
54            properties = j.get("properties", [])
55            value = properties.get("Value", None)
56            if value is None:
57                continue
58            print(f"{name:<20} {value:4.02f} {units}")
59
60asyncio.get_event_loop().run_until_complete(hello())
61