1# Contributors Listed Below - COPYRIGHT 2016 2# [+] International Business Machines Corp. 3# 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 14# implied. See the License for the specific language governing 15# permissions and limitations under the License. 16 17import dbus 18import dbus.service 19import dbus.exceptions 20 21OBJ_PREFIX = '/xyz/openbmc_project' 22 23 24def is_unique(connection): 25 return connection[0] == ':' 26 27 28def get_dbus(): 29 return dbus.SystemBus() 30 31 32class DbusProperties(dbus.service.Object): 33 def __init__(self, **kw): 34 self.validator = kw.pop('validator', None) 35 super(DbusProperties, self).__init__(**kw) 36 self.properties = {} 37 self._export = False 38 39 def unmask_signals(self): 40 self._export = True 41 inst = super(DbusProperties, self) 42 if hasattr(inst, 'unmask_signals'): 43 inst.unmask_signals() 44 45 def mask_signals(self): 46 self._export = False 47 inst = super(DbusProperties, self) 48 if hasattr(inst, 'mask_signals'): 49 inst.mask_signals() 50 51 @dbus.service.method( 52 dbus.PROPERTIES_IFACE, 53 in_signature='ss', out_signature='v') 54 def Get(self, interface_name, property_name): 55 d = self.GetAll(interface_name) 56 try: 57 v = d[property_name] 58 return v 59 except: 60 raise dbus.exceptions.DBusException( 61 "Unknown property: '{}'".format(property_name), 62 name="org.freedesktop.DBus.Error.UnknownProperty") 63 64 @dbus.service.method( 65 dbus.PROPERTIES_IFACE, 66 in_signature='s', out_signature='a{sv}') 67 def GetAll(self, interface_name): 68 try: 69 d = self.properties[interface_name] 70 return d 71 except: 72 raise dbus.exceptions.DBusException( 73 "Unknown interface: '{}'".format(interface_name), 74 name="org.freedesktop.DBus.Error.UnknownInterface") 75 76 @dbus.service.method( 77 dbus.PROPERTIES_IFACE, 78 in_signature='ssv') 79 def Set(self, interface_name, property_name, new_value): 80 if (interface_name not in self.properties): 81 self.properties[interface_name] = {} 82 83 if self.validator: 84 self.validator(interface_name, property_name, new_value) 85 86 try: 87 old_value = self.properties[interface_name][property_name] 88 if (old_value != new_value): 89 self.properties[interface_name][property_name] = new_value 90 if self._export: 91 self.PropertiesChanged( 92 interface_name, {property_name: new_value}, []) 93 94 except: 95 self.properties[interface_name][property_name] = new_value 96 if self._export: 97 self.PropertiesChanged( 98 interface_name, {property_name: new_value}, []) 99 100 @dbus.service.method( 101 "org.openbmc.Object.Properties", in_signature='sa{sv}') 102 def SetMultiple(self, interface_name, prop_dict): 103 if (interface_name not in self.properties): 104 self.properties[interface_name] = {} 105 106 value_changed = False 107 for property_name in prop_dict: 108 new_value = prop_dict[property_name] 109 try: 110 old_value = self.properties[interface_name][property_name] 111 if (old_value != new_value): 112 self.properties[interface_name][property_name] = new_value 113 value_changed = True 114 115 except: 116 self.properties[interface_name][property_name] = new_value 117 value_changed = True 118 if (value_changed is True and self._export): 119 self.PropertiesChanged(interface_name, prop_dict, []) 120 121 @dbus.service.signal( 122 dbus.PROPERTIES_IFACE, signature='sa{sv}as') 123 def PropertiesChanged( 124 self, interface_name, changed_properties, invalidated_properties): 125 pass 126 127 128def add_interfaces_to_class(cls, ifaces): 129 """ 130 The built-in Introspect method in dbus-python doesn't find 131 interfaces if the @method or @signal decorators aren't used 132 (property-only interfaces). Use this method on a class 133 derived from dbus.service.Object to help the dbus-python provided 134 Introspect method find these interfaces. 135 136 Arguments: 137 cls -- The dbus.service.Object superclass to add interfaces to. 138 ifaces -- The property-only interfaces to add to the class. 139 """ 140 141 for iface in ifaces: 142 class_table_key = '{}.{}'.format(cls.__module__, cls.__name__) 143 cls._dbus_class_table[class_table_key].setdefault(iface, {}) 144 145 146def add_interfaces(ifaces): 147 """ 148 A class decorator for add_interfaces_to_class. 149 """ 150 151 def decorator(cls): 152 undecorated = cls.__init__ 153 154 def ctor(obj, *a, **kw): 155 undecorated(obj, *a, **kw) 156 add_interfaces_to_class(cls, ifaces) 157 158 cls.__init__ = ctor 159 return cls 160 return decorator 161 162 163class DbusObjectManager(dbus.service.Object): 164 def __init__(self, **kw): 165 super(DbusObjectManager, self).__init__(**kw) 166 self.objects = {} 167 self._export = False 168 169 def unmask_signals(self): 170 self._export = True 171 inst = super(DbusObjectManager, self) 172 if hasattr(inst, 'unmask_signals'): 173 inst.unmask_signals() 174 175 def mask_signals(self): 176 self._export = False 177 inst = super(DbusObjectManager, self) 178 if hasattr(inst, 'mask_signals'): 179 inst.mask_signals() 180 181 def add(self, object_path, obj): 182 self.objects[object_path] = obj 183 if self._export: 184 self.InterfacesAdded(object_path, obj.properties) 185 186 def remove(self, object_path): 187 obj = self.objects.pop(object_path, None) 188 obj.remove_from_connection() 189 if self._export: 190 self.InterfacesRemoved(object_path, list(obj.properties.keys())) 191 192 def get(self, object_path, default=None): 193 return self.objects.get(object_path, default) 194 195 @dbus.service.method( 196 "org.freedesktop.DBus.ObjectManager", 197 in_signature='', out_signature='a{oa{sa{sv}}}') 198 def GetManagedObjects(self): 199 data = {} 200 for objpath in list(self.objects.keys()): 201 data[objpath] = self.objects[objpath].properties 202 return data 203 204 @dbus.service.signal( 205 "org.freedesktop.DBus.ObjectManager", signature='oa{sa{sv}}') 206 def InterfacesAdded(self, object_path, properties): 207 pass 208 209 @dbus.service.signal( 210 "org.freedesktop.DBus.ObjectManager", signature='oas') 211 def InterfacesRemoved(self, object_path, interfaces): 212 pass 213