| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- #!/usr/bin/env python3
- # SPDX-License-Identifier: LGPL-2.1-or-later
- import dbus
- import dbus.exceptions
- import dbus.mainloop.glib
- import dbus.service
- import array
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- import sys
- from random import randint
- mainloop = None
- BLUEZ_SERVICE_NAME = 'org.bluez'
- GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- GATT_SERVICE_IFACE = 'org.bluez.GattService1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
- class InvalidArgsException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
- class NotSupportedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotSupported'
- class NotPermittedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotPermitted'
- class InvalidValueLengthException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
- class FailedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.Failed'
- class Application(dbus.service.Object):
- """
- org.bluez.GattApplication1 interface implementation
- """
- def __init__(self, bus):
- self.path = '/'
- self.services = []
- dbus.service.Object.__init__(self, bus, self.path)
- self.add_service(HeartRateService(bus, 0))
- self.add_service(BatteryService(bus, 1))
- self.add_service(TestService(bus, 2))
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_service(self, service):
- self.services.append(service)
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- response = {}
- print('GetManagedObjects')
- for service in self.services:
- response[service.get_path()] = service.get_properties()
- chrcs = service.get_characteristics()
- for chrc in chrcs:
- response[chrc.get_path()] = chrc.get_properties()
- descs = chrc.get_descriptors()
- for desc in descs:
- response[desc.get_path()] = desc.get_properties()
- return response
- class Service(dbus.service.Object):
- """
- org.bluez.GattService1 interface implementation
- """
- PATH_BASE = '/org/bluez/example/service'
- def __init__(self, bus, index, uuid, primary):
- self.path = self.PATH_BASE + str(index)
- self.bus = bus
- self.uuid = uuid
- self.primary = primary
- self.characteristics = []
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- return {
- GATT_SERVICE_IFACE: {
- 'UUID': self.uuid,
- 'Primary': self.primary,
- 'Characteristics': dbus.Array(
- self.get_characteristic_paths(),
- signature='o')
- }
- }
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_characteristic(self, characteristic):
- self.characteristics.append(characteristic)
- def get_characteristic_paths(self):
- result = []
- for chrc in self.characteristics:
- result.append(chrc.get_path())
- return result
- def get_characteristics(self):
- return self.characteristics
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- if interface != GATT_SERVICE_IFACE:
- raise InvalidArgsException()
- return self.get_properties()[GATT_SERVICE_IFACE]
- class Characteristic(dbus.service.Object):
- """
- org.bluez.GattCharacteristic1 interface implementation
- """
- def __init__(self, bus, index, uuid, flags, service):
- self.path = service.path + '/char' + str(index)
- self.bus = bus
- self.uuid = uuid
- self.service = service
- self.flags = flags
- self.descriptors = []
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- return {
- GATT_CHRC_IFACE: {
- 'Service': self.service.get_path(),
- 'UUID': self.uuid,
- 'Flags': self.flags,
- 'Descriptors': dbus.Array(
- self.get_descriptor_paths(),
- signature='o')
- }
- }
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_descriptor(self, descriptor):
- self.descriptors.append(descriptor)
- def get_descriptor_paths(self):
- result = []
- for desc in self.descriptors:
- result.append(desc.get_path())
- return result
- def get_descriptors(self):
- return self.descriptors
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- if interface != GATT_CHRC_IFACE:
- raise InvalidArgsException()
- return self.get_properties()[GATT_CHRC_IFACE]
- @dbus.service.method(GATT_CHRC_IFACE,
- in_signature='a{sv}',
- out_signature='ay')
- def ReadValue(self, options):
- print('Default ReadValue called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
- def WriteValue(self, value, options):
- print('Default WriteValue called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_CHRC_IFACE)
- def StartNotify(self):
- print('Default StartNotify called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_CHRC_IFACE)
- def StopNotify(self):
- print('Default StopNotify called, returning error')
- raise NotSupportedException()
- @dbus.service.signal(DBUS_PROP_IFACE,
- signature='sa{sv}as')
- def PropertiesChanged(self, interface, changed, invalidated):
- pass
- class Descriptor(dbus.service.Object):
- """
- org.bluez.GattDescriptor1 interface implementation
- """
- def __init__(self, bus, index, uuid, flags, characteristic):
- self.path = characteristic.path + '/desc' + str(index)
- self.bus = bus
- self.uuid = uuid
- self.flags = flags
- self.chrc = characteristic
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- return {
- GATT_DESC_IFACE: {
- 'Characteristic': self.chrc.get_path(),
- 'UUID': self.uuid,
- 'Flags': self.flags,
- }
- }
- def get_path(self):
- return dbus.ObjectPath(self.path)
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- if interface != GATT_DESC_IFACE:
- raise InvalidArgsException()
- return self.get_properties()[GATT_DESC_IFACE]
- @dbus.service.method(GATT_DESC_IFACE,
- in_signature='a{sv}',
- out_signature='ay')
- def ReadValue(self, options):
- print ('Default ReadValue called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
- def WriteValue(self, value, options):
- print('Default WriteValue called, returning error')
- raise NotSupportedException()
- class HeartRateService(Service):
- """
- Fake Heart Rate Service that simulates a fake heart beat and control point
- behavior.
- """
- HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
- def __init__(self, bus, index):
- Service.__init__(self, bus, index, self.HR_UUID, True)
- self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
- self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
- self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
- self.energy_expended = 0
- class HeartRateMeasurementChrc(Characteristic):
- HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.HR_MSRMT_UUID,
- ['notify'],
- service)
- self.notifying = False
- self.hr_ee_count = 0
- def hr_msrmt_cb(self):
- value = []
- value.append(dbus.Byte(0x06))
- value.append(dbus.Byte(randint(90, 130)))
- if self.hr_ee_count % 10 == 0:
- value[0] = dbus.Byte(value[0] | 0x08)
- value.append(dbus.Byte(self.service.energy_expended & 0xff))
- value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
- self.service.energy_expended = \
- min(0xffff, self.service.energy_expended + 1)
- self.hr_ee_count += 1
- print('Updating value: ' + repr(value))
- self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
- return self.notifying
- def _update_hr_msrmt_simulation(self):
- print('Update HR Measurement Simulation')
- if not self.notifying:
- return
- GObject.timeout_add(1000, self.hr_msrmt_cb)
- def StartNotify(self):
- if self.notifying:
- print('Already notifying, nothing to do')
- return
- self.notifying = True
- self._update_hr_msrmt_simulation()
- def StopNotify(self):
- if not self.notifying:
- print('Not notifying, nothing to do')
- return
- self.notifying = False
- self._update_hr_msrmt_simulation()
- class BodySensorLocationChrc(Characteristic):
- BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.BODY_SNSR_LOC_UUID,
- ['read'],
- service)
- def ReadValue(self, options):
- # Return 'Chest' as the sensor location.
- return [ 0x01 ]
- class HeartRateControlPointChrc(Characteristic):
- HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.HR_CTRL_PT_UUID,
- ['write'],
- service)
- def WriteValue(self, value, options):
- print('Heart Rate Control Point WriteValue called')
- if len(value) != 1:
- raise InvalidValueLengthException()
- byte = value[0]
- print('Control Point value: ' + repr(byte))
- if byte != 1:
- raise FailedException("0x80")
- print('Energy Expended field reset!')
- self.service.energy_expended = 0
- class BatteryService(Service):
- """
- Fake Battery service that emulates a draining battery.
- """
- BATTERY_UUID = '180f'
- def __init__(self, bus, index):
- Service.__init__(self, bus, index, self.BATTERY_UUID, True)
- self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))
- class BatteryLevelCharacteristic(Characteristic):
- """
- Fake Battery Level characteristic. The battery level is drained by 2 points
- every 5 seconds.
- """
- BATTERY_LVL_UUID = '2a19'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.BATTERY_LVL_UUID,
- ['read', 'notify'],
- service)
- self.notifying = False
- self.battery_lvl = 100
- GObject.timeout_add(5000, self.drain_battery)
- def notify_battery_level(self):
- if not self.notifying:
- return
- self.PropertiesChanged(
- GATT_CHRC_IFACE,
- { 'Value': [dbus.Byte(self.battery_lvl)] }, [])
- def drain_battery(self):
- if not self.notifying:
- return True
- if self.battery_lvl > 0:
- self.battery_lvl -= 2
- if self.battery_lvl < 0:
- self.battery_lvl = 0
- print('Battery Level drained: ' + repr(self.battery_lvl))
- self.notify_battery_level()
- return True
- def ReadValue(self, options):
- print('Battery Level read: ' + repr(self.battery_lvl))
- return [dbus.Byte(self.battery_lvl)]
- def StartNotify(self):
- if self.notifying:
- print('Already notifying, nothing to do')
- return
- self.notifying = True
- self.notify_battery_level()
- def StopNotify(self):
- if not self.notifying:
- print('Not notifying, nothing to do')
- return
- self.notifying = False
- class TestService(Service):
- """
- Dummy test service that provides characteristics and descriptors that
- exercise various API functionality.
- """
- TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'
- def __init__(self, bus, index):
- Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)
- self.add_characteristic(TestCharacteristic(bus, 0, self))
- self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))
- self.add_characteristic(TestSecureCharacteristic(bus, 2, self))
- class TestCharacteristic(Characteristic):
- """
- Dummy test characteristic. Allows writing arbitrary bytes to its value, and
- contains "extended properties", as well as a test descriptor.
- """
- TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.TEST_CHRC_UUID,
- ['read', 'write', 'writable-auxiliaries'],
- service)
- self.value = []
- self.add_descriptor(TestDescriptor(bus, 0, self))
- self.add_descriptor(
- CharacteristicUserDescriptionDescriptor(bus, 1, self))
- def ReadValue(self, options):
- print('TestCharacteristic Read: ' + repr(self.value))
- return self.value
- def WriteValue(self, value, options):
- print('TestCharacteristic Write: ' + repr(value))
- self.value = value
- class TestDescriptor(Descriptor):
- """
- Dummy test descriptor. Returns a static value.
- """
- TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'
- def __init__(self, bus, index, characteristic):
- Descriptor.__init__(
- self, bus, index,
- self.TEST_DESC_UUID,
- ['read', 'write'],
- characteristic)
- def ReadValue(self, options):
- return [
- dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
- ]
- class CharacteristicUserDescriptionDescriptor(Descriptor):
- """
- Writable CUD descriptor.
- """
- CUD_UUID = '2901'
- def __init__(self, bus, index, characteristic):
- self.writable = 'writable-auxiliaries' in characteristic.flags
- self.value = array.array('B', b'This is a characteristic for testing')
- self.value = self.value.tolist()
- Descriptor.__init__(
- self, bus, index,
- self.CUD_UUID,
- ['read', 'write'],
- characteristic)
- def ReadValue(self, options):
- return self.value
- def WriteValue(self, value, options):
- if not self.writable:
- raise NotPermittedException()
- self.value = value
- class TestEncryptCharacteristic(Characteristic):
- """
- Dummy test characteristic requiring encryption.
- """
- TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.TEST_CHRC_UUID,
- ['encrypt-read', 'encrypt-write'],
- service)
- self.value = []
- self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
- self.add_descriptor(
- CharacteristicUserDescriptionDescriptor(bus, 3, self))
- def ReadValue(self, options):
- print('TestEncryptCharacteristic Read: ' + repr(self.value))
- return self.value
- def WriteValue(self, value, options):
- print('TestEncryptCharacteristic Write: ' + repr(value))
- self.value = value
- class TestEncryptDescriptor(Descriptor):
- """
- Dummy test descriptor requiring encryption. Returns a static value.
- """
- TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4'
- def __init__(self, bus, index, characteristic):
- Descriptor.__init__(
- self, bus, index,
- self.TEST_DESC_UUID,
- ['encrypt-read', 'encrypt-write'],
- characteristic)
- def ReadValue(self, options):
- return [
- dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
- ]
- class TestSecureCharacteristic(Characteristic):
- """
- Dummy test characteristic requiring secure connection.
- """
- TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- self.TEST_CHRC_UUID,
- ['secure-read', 'secure-write'],
- service)
- self.value = []
- self.add_descriptor(TestSecureDescriptor(bus, 2, self))
- self.add_descriptor(
- CharacteristicUserDescriptionDescriptor(bus, 3, self))
- def ReadValue(self, options):
- print('TestSecureCharacteristic Read: ' + repr(self.value))
- return self.value
- def WriteValue(self, value, options):
- print('TestSecureCharacteristic Write: ' + repr(value))
- self.value = value
- class TestSecureDescriptor(Descriptor):
- """
- Dummy test descriptor requiring secure connection. Returns a static value.
- """
- TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'
- def __init__(self, bus, index, characteristic):
- Descriptor.__init__(
- self, bus, index,
- self.TEST_DESC_UUID,
- ['secure-read', 'secure-write'],
- characteristic)
- def ReadValue(self, options):
- return [
- dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
- ]
- def register_app_cb():
- print('GATT application registered')
- def register_app_error_cb(error):
- print('Failed to register application: ' + str(error))
- mainloop.quit()
- def find_adapter(bus):
- remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
- DBUS_OM_IFACE)
- objects = remote_om.GetManagedObjects()
- for o, props in objects.items():
- if GATT_MANAGER_IFACE in props.keys():
- return o
- return None
- def main():
- global mainloop
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- adapter = find_adapter(bus)
- if not adapter:
- print('GattManager1 interface not found')
- return
- service_manager = dbus.Interface(
- bus.get_object(BLUEZ_SERVICE_NAME, adapter),
- GATT_MANAGER_IFACE)
- app = Application(bus)
- mainloop = GObject.MainLoop()
- print('Registering GATT application...')
- service_manager.RegisterApplication(app.get_path(), {},
- reply_handler=register_app_cb,
- error_handler=register_app_error_cb)
- mainloop.run()
- if __name__ == '__main__':
- main()
|