xref: /openbmc/pyphosphor/obmc/dbuslib/bindings.py (revision 93d0cf05)
1# Contributors Listed Below - COPYRIGHT 2016
2# [+] International Business Machines Corp.
3#
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied. See the License for the specific language governing
15# permissions and limitations under the License.
16
17import dbus
18
19OBJ_PREFIX = '/org/openbmc'
20
21
22def is_unique(connection):
23    return connection[0] == ':'
24
25
26def get_dbus():
27    return dbus.SystemBus()
28
29
30class DbusProperties(dbus.service.Object):
31    def __init__(self):
32        dbus.service.Object.__init__(self)
33        self.properties = {}
34        self.object_path = ""
35        self._export = False
36
37    def unmask_signals(self):
38        self._export = True
39        inst = super(DbusProperties, self)
40        if hasattr(inst, 'unmask_signals'):
41            inst.unmask_signals()
42
43    def mask_signals(self):
44        self._export = False
45        inst = super(DbusProperties, self)
46        if hasattr(inst, 'mask_signals'):
47            inst.mask_signals()
48
49    @dbus.service.method(
50        dbus.PROPERTIES_IFACE,
51        in_signature='ss', out_signature='v')
52    def Get(self, interface_name, property_name):
53        d = self.GetAll(interface_name)
54        try:
55            v = d[property_name]
56            return v
57        except:
58            raise dbus.exceptions.DBusException(
59                "org.freedesktop.UnknownProperty: "+property_name)
60
61    @dbus.service.method(
62        dbus.PROPERTIES_IFACE,
63        in_signature='s', out_signature='a{sv}')
64    def GetAll(self, interface_name):
65        try:
66            d = self.properties[interface_name]
67            return d
68        except:
69            raise dbus.exceptions.DBusException(
70                "org.freedesktop.UnknownInterface: "+interface_name)
71
72    @dbus.service.method(
73        dbus.PROPERTIES_IFACE,
74        in_signature='ssv')
75    def Set(self, interface_name, property_name, new_value):
76        if (interface_name not in self.properties):
77            self.properties[interface_name] = {}
78        try:
79            old_value = self.properties[interface_name][property_name]
80            if (old_value != new_value):
81                self.properties[interface_name][property_name] = new_value
82                if self._export:
83                    self.PropertiesChanged(
84                        interface_name, {property_name: new_value}, [])
85
86        except:
87            self.properties[interface_name][property_name] = new_value
88            if self._export:
89                self.PropertiesChanged(
90                    interface_name, {property_name: new_value}, [])
91
92    @dbus.service.method(
93        "org.openbmc.Object.Properties", in_signature='sa{sv}')
94    def SetMultiple(self, interface_name, prop_dict):
95        if (interface_name not in self.properties):
96            self.properties[interface_name] = {}
97
98        value_changed = False
99        for property_name in prop_dict:
100            new_value = prop_dict[property_name]
101            try:
102                old_value = self.properties[interface_name][property_name]
103                if (old_value != new_value):
104                    self.properties[interface_name][property_name] = new_value
105                    value_changed = True
106
107            except:
108                self.properties[interface_name][property_name] = new_value
109                value_changed = True
110        if (value_changed is True and self._export):
111            self.PropertiesChanged(interface_name, prop_dict, [])
112
113    @dbus.service.signal(
114        dbus.PROPERTIES_IFACE, signature='sa{sv}as')
115    def PropertiesChanged(
116            self, interface_name, changed_properties, invalidated_properties):
117        pass
118
119
120class DbusObjectManager(dbus.service.Object):
121    def __init__(self):
122        dbus.service.Object.__init__(self)
123        self.objects = {}
124        self._export = False
125
126    def unmask_signals(self):
127        self._export = True
128        inst = super(DbusObjectManager, self)
129        if hasattr(inst, 'unmask_signals'):
130            inst.unmask_signals()
131
132    def mask_signals(self):
133        self._export = False
134        inst = super(DbusObjectManager, self)
135        if hasattr(inst, 'mask_signals'):
136            inst.mask_signals()
137
138    def add(self, object_path, obj):
139        self.objects[object_path] = obj
140        if self._export:
141            self.InterfacesAdded(object_path, obj.properties)
142
143    def remove(self, object_path):
144        obj = self.objects.pop(object_path, None)
145        obj.remove_from_connection()
146        if self._export:
147            self.InterfacesRemoved(object_path, obj.properties.keys())
148
149    def get(self, object_path, default=None):
150        return self.objects.get(object_path, default)
151
152    @dbus.service.method(
153        "org.freedesktop.DBus.ObjectManager",
154        in_signature='', out_signature='a{oa{sa{sv}}}')
155    def GetManagedObjects(self):
156        data = {}
157        for objpath in self.objects.keys():
158            data[objpath] = self.objects[objpath].properties
159        return data
160
161    @dbus.service.signal(
162        "org.freedesktop.DBus.ObjectManager", signature='oa{sa{sv}}')
163    def InterfacesAdded(self, object_path, properties):
164        pass
165
166    @dbus.service.signal(
167        "org.freedesktop.DBus.ObjectManager", signature='oas')
168    def InterfacesRemoved(self, object_path, interfaces):
169        pass
170