1# Contributors Listed Below - COPYRIGHT 2016 2# [+] International Business Machines Corp. 3# 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 14# implied. See the License for the specific language governing 15# permissions and limitations under the License. 16 17import dbus 18import dbus.service 19import dbus.exceptions 20 21OBJ_PREFIX = '/org/openbmc' 22 23 24def is_unique(connection): 25 return connection[0] == ':' 26 27 28def get_dbus(): 29 return dbus.SystemBus() 30 31 32class DbusProperties(dbus.service.Object): 33 def __init__(self, **kw): 34 self.validator = kw.pop('validator', None) 35 super(DbusProperties, self).__init__(**kw) 36 self.properties = {} 37 self._export = False 38 39 def unmask_signals(self): 40 self._export = True 41 inst = super(DbusProperties, self) 42 if hasattr(inst, 'unmask_signals'): 43 inst.unmask_signals() 44 45 def mask_signals(self): 46 self._export = False 47 inst = super(DbusProperties, self) 48 if hasattr(inst, 'mask_signals'): 49 inst.mask_signals() 50 51 @dbus.service.method( 52 dbus.PROPERTIES_IFACE, 53 in_signature='ss', out_signature='v') 54 def Get(self, interface_name, property_name): 55 d = self.GetAll(interface_name) 56 try: 57 v = d[property_name] 58 return v 59 except: 60 raise dbus.exceptions.DBusException( 61 "org.freedesktop.UnknownProperty: "+property_name) 62 63 @dbus.service.method( 64 dbus.PROPERTIES_IFACE, 65 in_signature='s', out_signature='a{sv}') 66 def GetAll(self, interface_name): 67 try: 68 d = self.properties[interface_name] 69 return d 70 except: 71 raise dbus.exceptions.DBusException( 72 "org.freedesktop.UnknownInterface: "+interface_name) 73 74 @dbus.service.method( 75 dbus.PROPERTIES_IFACE, 76 in_signature='ssv') 77 def Set(self, interface_name, property_name, new_value): 78 if (interface_name not in self.properties): 79 self.properties[interface_name] = {} 80 81 if self.validator: 82 self.validator(interface_name, property_name, new_value) 83 84 try: 85 old_value = self.properties[interface_name][property_name] 86 if (old_value != new_value): 87 self.properties[interface_name][property_name] = new_value 88 if self._export: 89 self.PropertiesChanged( 90 interface_name, {property_name: new_value}, []) 91 92 except: 93 self.properties[interface_name][property_name] = new_value 94 if self._export: 95 self.PropertiesChanged( 96 interface_name, {property_name: new_value}, []) 97 98 @dbus.service.method( 99 "org.openbmc.Object.Properties", in_signature='sa{sv}') 100 def SetMultiple(self, interface_name, prop_dict): 101 if (interface_name not in self.properties): 102 self.properties[interface_name] = {} 103 104 value_changed = False 105 for property_name in prop_dict: 106 new_value = prop_dict[property_name] 107 try: 108 old_value = self.properties[interface_name][property_name] 109 if (old_value != new_value): 110 self.properties[interface_name][property_name] = new_value 111 value_changed = True 112 113 except: 114 self.properties[interface_name][property_name] = new_value 115 value_changed = True 116 if (value_changed is True and self._export): 117 self.PropertiesChanged(interface_name, prop_dict, []) 118 119 @dbus.service.signal( 120 dbus.PROPERTIES_IFACE, signature='sa{sv}as') 121 def PropertiesChanged( 122 self, interface_name, changed_properties, invalidated_properties): 123 pass 124 125 126class DbusObjectManager(dbus.service.Object): 127 def __init__(self, **kw): 128 super(DbusObjectManager, self).__init__(**kw) 129 self.objects = {} 130 self._export = False 131 132 def unmask_signals(self): 133 self._export = True 134 inst = super(DbusObjectManager, self) 135 if hasattr(inst, 'unmask_signals'): 136 inst.unmask_signals() 137 138 def mask_signals(self): 139 self._export = False 140 inst = super(DbusObjectManager, self) 141 if hasattr(inst, 'mask_signals'): 142 inst.mask_signals() 143 144 def add(self, object_path, obj): 145 self.objects[object_path] = obj 146 if self._export: 147 self.InterfacesAdded(object_path, obj.properties) 148 149 def remove(self, object_path): 150 obj = self.objects.pop(object_path, None) 151 obj.remove_from_connection() 152 if self._export: 153 self.InterfacesRemoved(object_path, obj.properties.keys()) 154 155 def get(self, object_path, default=None): 156 return self.objects.get(object_path, default) 157 158 @dbus.service.method( 159 "org.freedesktop.DBus.ObjectManager", 160 in_signature='', out_signature='a{oa{sa{sv}}}') 161 def GetManagedObjects(self): 162 data = {} 163 for objpath in self.objects.keys(): 164 data[objpath] = self.objects[objpath].properties 165 return data 166 167 @dbus.service.signal( 168 "org.freedesktop.DBus.ObjectManager", signature='oa{sa{sv}}') 169 def InterfacesAdded(self, object_path, properties): 170 pass 171 172 @dbus.service.signal( 173 "org.freedesktop.DBus.ObjectManager", signature='oas') 174 def InterfacesRemoved(self, object_path, interfaces): 175 pass 176