xref: /openbmc/bmcweb/scripts/websocket_test.py (revision 81d523a7)
1#!/usr/bin/env python3
2
3# Example of streaming sensor values from openbmc using the /subscribe api
4# requires websockets package to be installed
5
6import asyncio
7import ssl
8import websockets
9import base64
10import json
11import argparse
12
13parser = argparse.ArgumentParser()
14parser.add_argument("--host", help="Host to connect to", required=True)
15parser.add_argument(
16    "--username", help="Username to connect with", default="root")
17parser.add_argument("--password", help="Password to use", default="0penBmc")
18parser.add_argument('--ssl', default=True,
19                    action=argparse.BooleanOptionalAction)
20
21args = parser.parse_args()
22
23sensor_type_map = {
24    "voltage": "Volts",
25    "power": "Watts",
26    "fan": "RPM",
27    "fan_tach": "RPM",
28    "temperature": "Degrees C",
29    "altitude": "Meters",
30    "current": "Amps",
31    "energy": "Joules",
32    "cfm": "CFM"
33}
34
35
36async def hello():
37    protocol = 'ws'
38    if args.ssl:
39        protocol += 's'
40    uri = '{}://{}/subscribe'.format(protocol, args.host)
41    ssl_context = ssl.SSLContext()
42    authbytes = "{}:{}".format(args.username, args.password).encode('ascii')
43    auth = "Basic {}".format(base64.b64encode(authbytes).decode('ascii'))
44    headers = {"Authorization": auth}
45    async with \
46        websockets.connect(uri, ssl=ssl_context, extra_headers=headers) \
47            if args.ssl else websockets.connect(uri, extra_headers=headers) \
48            as websocket:
49        request = json.dumps({
50            "paths": ["/xyz/openbmc_project/sensors"],
51            "interfaces": ["xyz.openbmc_project.Sensor.Value"]
52        })
53        await websocket.send(request)
54
55        while True:
56            payload = await websocket.recv()
57            j = json.loads(payload)
58            path = j.get("path", "unknown/unknown")
59            name = path.split("/")[-1]
60            sensor_type = path.split("/")[-2]
61            units = sensor_type_map.get(sensor_type, "")
62            properties = j.get("properties", [])
63            value = properties.get("Value", None)
64            if value is None:
65                continue
66            print(f"{name:<20} {value:4.02f} {units}")
67
68
69asyncio.get_event_loop().run_until_complete(hello())
70