xref: /openbmc/pyphosphor/obmc/dbuslib/bindings.py (revision f99783be)
1# Contributors Listed Below - COPYRIGHT 2016
2# [+] International Business Machines Corp.
3#
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied. See the License for the specific language governing
15# permissions and limitations under the License.
16
17import dbus
18import dbus.service
19import dbus.exceptions
20
21OBJ_PREFIX = '/xyz/openbmc_project'
22
23
24def is_unique(connection):
25    return connection[0] == ':'
26
27
28def get_dbus():
29    return dbus.SystemBus()
30
31
32class DbusProperties(dbus.service.Object):
33    def __init__(self, **kw):
34        self.validator = kw.pop('validator', None)
35        super(DbusProperties, self).__init__(**kw)
36        self.properties = {}
37        self._export = False
38
39    def unmask_signals(self):
40        self._export = True
41        inst = super(DbusProperties, self)
42        if hasattr(inst, 'unmask_signals'):
43            inst.unmask_signals()
44
45    def mask_signals(self):
46        self._export = False
47        inst = super(DbusProperties, self)
48        if hasattr(inst, 'mask_signals'):
49            inst.mask_signals()
50
51    @dbus.service.method(
52        dbus.PROPERTIES_IFACE,
53        in_signature='ss', out_signature='v')
54    def Get(self, interface_name, property_name):
55        d = self.GetAll(interface_name)
56        try:
57            v = d[property_name]
58            return v
59        except:
60            raise dbus.exceptions.DBusException(
61                "Unknown property: '{}'".format(property_name),
62                name="org.freedesktop.DBus.Error.UnknownProperty")
63
64    @dbus.service.method(
65        dbus.PROPERTIES_IFACE,
66        in_signature='s', out_signature='a{sv}')
67    def GetAll(self, interface_name):
68        try:
69            d = self.properties[interface_name]
70            return d
71        except:
72            raise dbus.exceptions.DBusException(
73                "Unknown interface: '{}'".format(interface_name),
74                name="org.freedesktop.DBus.Error.UnknownInterface")
75
76    @dbus.service.method(
77        dbus.PROPERTIES_IFACE,
78        in_signature='ssv')
79    def Set(self, interface_name, property_name, new_value):
80        if (interface_name not in self.properties):
81            self.properties[interface_name] = {}
82
83        if self.validator:
84            self.validator(interface_name, property_name, new_value)
85
86        try:
87            old_value = self.properties[interface_name][property_name]
88            if (old_value != new_value):
89                self.properties[interface_name][property_name] = new_value
90                if self._export:
91                    self.PropertiesChanged(
92                        interface_name, {property_name: new_value}, [])
93
94        except:
95            self.properties[interface_name][property_name] = new_value
96            if self._export:
97                self.PropertiesChanged(
98                    interface_name, {property_name: new_value}, [])
99
100    @dbus.service.method(
101        "org.openbmc.Object.Properties", in_signature='sa{sv}')
102    def SetMultiple(self, interface_name, prop_dict):
103        if (interface_name not in self.properties):
104            self.properties[interface_name] = {}
105
106        value_changed = False
107        for property_name in prop_dict:
108            new_value = prop_dict[property_name]
109            try:
110                old_value = self.properties[interface_name][property_name]
111                if (old_value != new_value):
112                    self.properties[interface_name][property_name] = new_value
113                    value_changed = True
114
115            except:
116                self.properties[interface_name][property_name] = new_value
117                value_changed = True
118        if (value_changed is True and self._export):
119            self.PropertiesChanged(interface_name, prop_dict, [])
120
121    @dbus.service.signal(
122        dbus.PROPERTIES_IFACE, signature='sa{sv}as')
123    def PropertiesChanged(
124            self, interface_name, changed_properties, invalidated_properties):
125        pass
126
127
128def add_interfaces_to_class(cls, ifaces):
129    """
130    The built-in Introspect method in dbus-python doesn't find
131    interfaces if the @method or @signal decorators aren't used
132    (property-only interfaces).  Use this method on a class
133    derived from dbus.service.Object to help the dbus-python provided
134    Introspect method find these interfaces.
135
136    Arguments:
137    cls -- The dbus.service.Object superclass to add interfaces to.
138    ifaces -- The property-only interfaces to add to the class.
139    """
140
141    for iface in ifaces:
142        class_table_key = '{}.{}'.format(cls.__module__, cls.__name__)
143        cls._dbus_class_table[class_table_key].setdefault(iface, {})
144
145
146def add_interfaces(ifaces):
147    """
148    A class decorator for add_interfaces_to_class.
149    """
150
151    def decorator(cls):
152        undecorated = cls.__init__
153
154        def ctor(obj, *a, **kw):
155            undecorated(obj, *a, **kw)
156            add_interfaces_to_class(cls, ifaces)
157
158        cls.__init__ = ctor
159        return cls
160    return decorator
161
162
163class DbusObjectManager(dbus.service.Object):
164    def __init__(self, **kw):
165        super(DbusObjectManager, self).__init__(**kw)
166        self.objects = {}
167        self._export = False
168
169    def unmask_signals(self):
170        self._export = True
171        inst = super(DbusObjectManager, self)
172        if hasattr(inst, 'unmask_signals'):
173            inst.unmask_signals()
174
175    def mask_signals(self):
176        self._export = False
177        inst = super(DbusObjectManager, self)
178        if hasattr(inst, 'mask_signals'):
179            inst.mask_signals()
180
181    def add(self, object_path, obj):
182        self.objects[object_path] = obj
183        if self._export:
184            self.InterfacesAdded(object_path, obj.properties)
185
186    def remove(self, object_path):
187        obj = self.objects.pop(object_path, None)
188        obj.remove_from_connection()
189        if self._export:
190            self.InterfacesRemoved(object_path, list(obj.properties.keys()))
191
192    def get(self, object_path, default=None):
193        return self.objects.get(object_path, default)
194
195    @dbus.service.method(
196        "org.freedesktop.DBus.ObjectManager",
197        in_signature='', out_signature='a{oa{sa{sv}}}')
198    def GetManagedObjects(self):
199        data = {}
200        for objpath in list(self.objects.keys()):
201            data[objpath] = self.objects[objpath].properties
202        return data
203
204    @dbus.service.signal(
205        "org.freedesktop.DBus.ObjectManager", signature='oa{sa{sv}}')
206    def InterfacesAdded(self, object_path, properties):
207        pass
208
209    @dbus.service.signal(
210        "org.freedesktop.DBus.ObjectManager", signature='oas')
211    def InterfacesRemoved(self, object_path, interfaces):
212        pass
213