example-gatt-server 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. #!/usr/bin/env python3
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. import dbus
  4. import dbus.exceptions
  5. import dbus.mainloop.glib
  6. import dbus.service
  7. import array
  8. try:
  9. from gi.repository import GObject
  10. except ImportError:
  11. import gobject as GObject
  12. import sys
  13. from random import randint
  14. mainloop = None
  15. BLUEZ_SERVICE_NAME = 'org.bluez'
  16. GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
  17. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  18. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  19. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  20. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  21. GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
  22. class InvalidArgsException(dbus.exceptions.DBusException):
  23. _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
  24. class NotSupportedException(dbus.exceptions.DBusException):
  25. _dbus_error_name = 'org.bluez.Error.NotSupported'
  26. class NotPermittedException(dbus.exceptions.DBusException):
  27. _dbus_error_name = 'org.bluez.Error.NotPermitted'
  28. class InvalidValueLengthException(dbus.exceptions.DBusException):
  29. _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
  30. class FailedException(dbus.exceptions.DBusException):
  31. _dbus_error_name = 'org.bluez.Error.Failed'
  32. class Application(dbus.service.Object):
  33. """
  34. org.bluez.GattApplication1 interface implementation
  35. """
  36. def __init__(self, bus):
  37. self.path = '/'
  38. self.services = []
  39. dbus.service.Object.__init__(self, bus, self.path)
  40. self.add_service(HeartRateService(bus, 0))
  41. self.add_service(BatteryService(bus, 1))
  42. self.add_service(TestService(bus, 2))
  43. def get_path(self):
  44. return dbus.ObjectPath(self.path)
  45. def add_service(self, service):
  46. self.services.append(service)
  47. @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
  48. def GetManagedObjects(self):
  49. response = {}
  50. print('GetManagedObjects')
  51. for service in self.services:
  52. response[service.get_path()] = service.get_properties()
  53. chrcs = service.get_characteristics()
  54. for chrc in chrcs:
  55. response[chrc.get_path()] = chrc.get_properties()
  56. descs = chrc.get_descriptors()
  57. for desc in descs:
  58. response[desc.get_path()] = desc.get_properties()
  59. return response
  60. class Service(dbus.service.Object):
  61. """
  62. org.bluez.GattService1 interface implementation
  63. """
  64. PATH_BASE = '/org/bluez/example/service'
  65. def __init__(self, bus, index, uuid, primary):
  66. self.path = self.PATH_BASE + str(index)
  67. self.bus = bus
  68. self.uuid = uuid
  69. self.primary = primary
  70. self.characteristics = []
  71. dbus.service.Object.__init__(self, bus, self.path)
  72. def get_properties(self):
  73. return {
  74. GATT_SERVICE_IFACE: {
  75. 'UUID': self.uuid,
  76. 'Primary': self.primary,
  77. 'Characteristics': dbus.Array(
  78. self.get_characteristic_paths(),
  79. signature='o')
  80. }
  81. }
  82. def get_path(self):
  83. return dbus.ObjectPath(self.path)
  84. def add_characteristic(self, characteristic):
  85. self.characteristics.append(characteristic)
  86. def get_characteristic_paths(self):
  87. result = []
  88. for chrc in self.characteristics:
  89. result.append(chrc.get_path())
  90. return result
  91. def get_characteristics(self):
  92. return self.characteristics
  93. @dbus.service.method(DBUS_PROP_IFACE,
  94. in_signature='s',
  95. out_signature='a{sv}')
  96. def GetAll(self, interface):
  97. if interface != GATT_SERVICE_IFACE:
  98. raise InvalidArgsException()
  99. return self.get_properties()[GATT_SERVICE_IFACE]
  100. class Characteristic(dbus.service.Object):
  101. """
  102. org.bluez.GattCharacteristic1 interface implementation
  103. """
  104. def __init__(self, bus, index, uuid, flags, service):
  105. self.path = service.path + '/char' + str(index)
  106. self.bus = bus
  107. self.uuid = uuid
  108. self.service = service
  109. self.flags = flags
  110. self.descriptors = []
  111. dbus.service.Object.__init__(self, bus, self.path)
  112. def get_properties(self):
  113. return {
  114. GATT_CHRC_IFACE: {
  115. 'Service': self.service.get_path(),
  116. 'UUID': self.uuid,
  117. 'Flags': self.flags,
  118. 'Descriptors': dbus.Array(
  119. self.get_descriptor_paths(),
  120. signature='o')
  121. }
  122. }
  123. def get_path(self):
  124. return dbus.ObjectPath(self.path)
  125. def add_descriptor(self, descriptor):
  126. self.descriptors.append(descriptor)
  127. def get_descriptor_paths(self):
  128. result = []
  129. for desc in self.descriptors:
  130. result.append(desc.get_path())
  131. return result
  132. def get_descriptors(self):
  133. return self.descriptors
  134. @dbus.service.method(DBUS_PROP_IFACE,
  135. in_signature='s',
  136. out_signature='a{sv}')
  137. def GetAll(self, interface):
  138. if interface != GATT_CHRC_IFACE:
  139. raise InvalidArgsException()
  140. return self.get_properties()[GATT_CHRC_IFACE]
  141. @dbus.service.method(GATT_CHRC_IFACE,
  142. in_signature='a{sv}',
  143. out_signature='ay')
  144. def ReadValue(self, options):
  145. print('Default ReadValue called, returning error')
  146. raise NotSupportedException()
  147. @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
  148. def WriteValue(self, value, options):
  149. print('Default WriteValue called, returning error')
  150. raise NotSupportedException()
  151. @dbus.service.method(GATT_CHRC_IFACE)
  152. def StartNotify(self):
  153. print('Default StartNotify called, returning error')
  154. raise NotSupportedException()
  155. @dbus.service.method(GATT_CHRC_IFACE)
  156. def StopNotify(self):
  157. print('Default StopNotify called, returning error')
  158. raise NotSupportedException()
  159. @dbus.service.signal(DBUS_PROP_IFACE,
  160. signature='sa{sv}as')
  161. def PropertiesChanged(self, interface, changed, invalidated):
  162. pass
  163. class Descriptor(dbus.service.Object):
  164. """
  165. org.bluez.GattDescriptor1 interface implementation
  166. """
  167. def __init__(self, bus, index, uuid, flags, characteristic):
  168. self.path = characteristic.path + '/desc' + str(index)
  169. self.bus = bus
  170. self.uuid = uuid
  171. self.flags = flags
  172. self.chrc = characteristic
  173. dbus.service.Object.__init__(self, bus, self.path)
  174. def get_properties(self):
  175. return {
  176. GATT_DESC_IFACE: {
  177. 'Characteristic': self.chrc.get_path(),
  178. 'UUID': self.uuid,
  179. 'Flags': self.flags,
  180. }
  181. }
  182. def get_path(self):
  183. return dbus.ObjectPath(self.path)
  184. @dbus.service.method(DBUS_PROP_IFACE,
  185. in_signature='s',
  186. out_signature='a{sv}')
  187. def GetAll(self, interface):
  188. if interface != GATT_DESC_IFACE:
  189. raise InvalidArgsException()
  190. return self.get_properties()[GATT_DESC_IFACE]
  191. @dbus.service.method(GATT_DESC_IFACE,
  192. in_signature='a{sv}',
  193. out_signature='ay')
  194. def ReadValue(self, options):
  195. print ('Default ReadValue called, returning error')
  196. raise NotSupportedException()
  197. @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
  198. def WriteValue(self, value, options):
  199. print('Default WriteValue called, returning error')
  200. raise NotSupportedException()
  201. class HeartRateService(Service):
  202. """
  203. Fake Heart Rate Service that simulates a fake heart beat and control point
  204. behavior.
  205. """
  206. HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
  207. def __init__(self, bus, index):
  208. Service.__init__(self, bus, index, self.HR_UUID, True)
  209. self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))
  210. self.add_characteristic(BodySensorLocationChrc(bus, 1, self))
  211. self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))
  212. self.energy_expended = 0
  213. class HeartRateMeasurementChrc(Characteristic):
  214. HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
  215. def __init__(self, bus, index, service):
  216. Characteristic.__init__(
  217. self, bus, index,
  218. self.HR_MSRMT_UUID,
  219. ['notify'],
  220. service)
  221. self.notifying = False
  222. self.hr_ee_count = 0
  223. def hr_msrmt_cb(self):
  224. value = []
  225. value.append(dbus.Byte(0x06))
  226. value.append(dbus.Byte(randint(90, 130)))
  227. if self.hr_ee_count % 10 == 0:
  228. value[0] = dbus.Byte(value[0] | 0x08)
  229. value.append(dbus.Byte(self.service.energy_expended & 0xff))
  230. value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))
  231. self.service.energy_expended = \
  232. min(0xffff, self.service.energy_expended + 1)
  233. self.hr_ee_count += 1
  234. print('Updating value: ' + repr(value))
  235. self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])
  236. return self.notifying
  237. def _update_hr_msrmt_simulation(self):
  238. print('Update HR Measurement Simulation')
  239. if not self.notifying:
  240. return
  241. GObject.timeout_add(1000, self.hr_msrmt_cb)
  242. def StartNotify(self):
  243. if self.notifying:
  244. print('Already notifying, nothing to do')
  245. return
  246. self.notifying = True
  247. self._update_hr_msrmt_simulation()
  248. def StopNotify(self):
  249. if not self.notifying:
  250. print('Not notifying, nothing to do')
  251. return
  252. self.notifying = False
  253. self._update_hr_msrmt_simulation()
  254. class BodySensorLocationChrc(Characteristic):
  255. BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
  256. def __init__(self, bus, index, service):
  257. Characteristic.__init__(
  258. self, bus, index,
  259. self.BODY_SNSR_LOC_UUID,
  260. ['read'],
  261. service)
  262. def ReadValue(self, options):
  263. # Return 'Chest' as the sensor location.
  264. return [ 0x01 ]
  265. class HeartRateControlPointChrc(Characteristic):
  266. HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
  267. def __init__(self, bus, index, service):
  268. Characteristic.__init__(
  269. self, bus, index,
  270. self.HR_CTRL_PT_UUID,
  271. ['write'],
  272. service)
  273. def WriteValue(self, value, options):
  274. print('Heart Rate Control Point WriteValue called')
  275. if len(value) != 1:
  276. raise InvalidValueLengthException()
  277. byte = value[0]
  278. print('Control Point value: ' + repr(byte))
  279. if byte != 1:
  280. raise FailedException("0x80")
  281. print('Energy Expended field reset!')
  282. self.service.energy_expended = 0
  283. class BatteryService(Service):
  284. """
  285. Fake Battery service that emulates a draining battery.
  286. """
  287. BATTERY_UUID = '180f'
  288. def __init__(self, bus, index):
  289. Service.__init__(self, bus, index, self.BATTERY_UUID, True)
  290. self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))
  291. class BatteryLevelCharacteristic(Characteristic):
  292. """
  293. Fake Battery Level characteristic. The battery level is drained by 2 points
  294. every 5 seconds.
  295. """
  296. BATTERY_LVL_UUID = '2a19'
  297. def __init__(self, bus, index, service):
  298. Characteristic.__init__(
  299. self, bus, index,
  300. self.BATTERY_LVL_UUID,
  301. ['read', 'notify'],
  302. service)
  303. self.notifying = False
  304. self.battery_lvl = 100
  305. GObject.timeout_add(5000, self.drain_battery)
  306. def notify_battery_level(self):
  307. if not self.notifying:
  308. return
  309. self.PropertiesChanged(
  310. GATT_CHRC_IFACE,
  311. { 'Value': [dbus.Byte(self.battery_lvl)] }, [])
  312. def drain_battery(self):
  313. if not self.notifying:
  314. return True
  315. if self.battery_lvl > 0:
  316. self.battery_lvl -= 2
  317. if self.battery_lvl < 0:
  318. self.battery_lvl = 0
  319. print('Battery Level drained: ' + repr(self.battery_lvl))
  320. self.notify_battery_level()
  321. return True
  322. def ReadValue(self, options):
  323. print('Battery Level read: ' + repr(self.battery_lvl))
  324. return [dbus.Byte(self.battery_lvl)]
  325. def StartNotify(self):
  326. if self.notifying:
  327. print('Already notifying, nothing to do')
  328. return
  329. self.notifying = True
  330. self.notify_battery_level()
  331. def StopNotify(self):
  332. if not self.notifying:
  333. print('Not notifying, nothing to do')
  334. return
  335. self.notifying = False
  336. class TestService(Service):
  337. """
  338. Dummy test service that provides characteristics and descriptors that
  339. exercise various API functionality.
  340. """
  341. TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'
  342. def __init__(self, bus, index):
  343. Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)
  344. self.add_characteristic(TestCharacteristic(bus, 0, self))
  345. self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))
  346. self.add_characteristic(TestSecureCharacteristic(bus, 2, self))
  347. class TestCharacteristic(Characteristic):
  348. """
  349. Dummy test characteristic. Allows writing arbitrary bytes to its value, and
  350. contains "extended properties", as well as a test descriptor.
  351. """
  352. TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'
  353. def __init__(self, bus, index, service):
  354. Characteristic.__init__(
  355. self, bus, index,
  356. self.TEST_CHRC_UUID,
  357. ['read', 'write', 'writable-auxiliaries'],
  358. service)
  359. self.value = []
  360. self.add_descriptor(TestDescriptor(bus, 0, self))
  361. self.add_descriptor(
  362. CharacteristicUserDescriptionDescriptor(bus, 1, self))
  363. def ReadValue(self, options):
  364. print('TestCharacteristic Read: ' + repr(self.value))
  365. return self.value
  366. def WriteValue(self, value, options):
  367. print('TestCharacteristic Write: ' + repr(value))
  368. self.value = value
  369. class TestDescriptor(Descriptor):
  370. """
  371. Dummy test descriptor. Returns a static value.
  372. """
  373. TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'
  374. def __init__(self, bus, index, characteristic):
  375. Descriptor.__init__(
  376. self, bus, index,
  377. self.TEST_DESC_UUID,
  378. ['read', 'write'],
  379. characteristic)
  380. def ReadValue(self, options):
  381. return [
  382. dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
  383. ]
  384. class CharacteristicUserDescriptionDescriptor(Descriptor):
  385. """
  386. Writable CUD descriptor.
  387. """
  388. CUD_UUID = '2901'
  389. def __init__(self, bus, index, characteristic):
  390. self.writable = 'writable-auxiliaries' in characteristic.flags
  391. self.value = array.array('B', b'This is a characteristic for testing')
  392. self.value = self.value.tolist()
  393. Descriptor.__init__(
  394. self, bus, index,
  395. self.CUD_UUID,
  396. ['read', 'write'],
  397. characteristic)
  398. def ReadValue(self, options):
  399. return self.value
  400. def WriteValue(self, value, options):
  401. if not self.writable:
  402. raise NotPermittedException()
  403. self.value = value
  404. class TestEncryptCharacteristic(Characteristic):
  405. """
  406. Dummy test characteristic requiring encryption.
  407. """
  408. TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3'
  409. def __init__(self, bus, index, service):
  410. Characteristic.__init__(
  411. self, bus, index,
  412. self.TEST_CHRC_UUID,
  413. ['encrypt-read', 'encrypt-write'],
  414. service)
  415. self.value = []
  416. self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
  417. self.add_descriptor(
  418. CharacteristicUserDescriptionDescriptor(bus, 3, self))
  419. def ReadValue(self, options):
  420. print('TestEncryptCharacteristic Read: ' + repr(self.value))
  421. return self.value
  422. def WriteValue(self, value, options):
  423. print('TestEncryptCharacteristic Write: ' + repr(value))
  424. self.value = value
  425. class TestEncryptDescriptor(Descriptor):
  426. """
  427. Dummy test descriptor requiring encryption. Returns a static value.
  428. """
  429. TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4'
  430. def __init__(self, bus, index, characteristic):
  431. Descriptor.__init__(
  432. self, bus, index,
  433. self.TEST_DESC_UUID,
  434. ['encrypt-read', 'encrypt-write'],
  435. characteristic)
  436. def ReadValue(self, options):
  437. return [
  438. dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
  439. ]
  440. class TestSecureCharacteristic(Characteristic):
  441. """
  442. Dummy test characteristic requiring secure connection.
  443. """
  444. TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'
  445. def __init__(self, bus, index, service):
  446. Characteristic.__init__(
  447. self, bus, index,
  448. self.TEST_CHRC_UUID,
  449. ['secure-read', 'secure-write'],
  450. service)
  451. self.value = []
  452. self.add_descriptor(TestSecureDescriptor(bus, 2, self))
  453. self.add_descriptor(
  454. CharacteristicUserDescriptionDescriptor(bus, 3, self))
  455. def ReadValue(self, options):
  456. print('TestSecureCharacteristic Read: ' + repr(self.value))
  457. return self.value
  458. def WriteValue(self, value, options):
  459. print('TestSecureCharacteristic Write: ' + repr(value))
  460. self.value = value
  461. class TestSecureDescriptor(Descriptor):
  462. """
  463. Dummy test descriptor requiring secure connection. Returns a static value.
  464. """
  465. TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'
  466. def __init__(self, bus, index, characteristic):
  467. Descriptor.__init__(
  468. self, bus, index,
  469. self.TEST_DESC_UUID,
  470. ['secure-read', 'secure-write'],
  471. characteristic)
  472. def ReadValue(self, options):
  473. return [
  474. dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
  475. ]
  476. def register_app_cb():
  477. print('GATT application registered')
  478. def register_app_error_cb(error):
  479. print('Failed to register application: ' + str(error))
  480. mainloop.quit()
  481. def find_adapter(bus):
  482. remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
  483. DBUS_OM_IFACE)
  484. objects = remote_om.GetManagedObjects()
  485. for o, props in objects.items():
  486. if GATT_MANAGER_IFACE in props.keys():
  487. return o
  488. return None
  489. def main():
  490. global mainloop
  491. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  492. bus = dbus.SystemBus()
  493. adapter = find_adapter(bus)
  494. if not adapter:
  495. print('GattManager1 interface not found')
  496. return
  497. service_manager = dbus.Interface(
  498. bus.get_object(BLUEZ_SERVICE_NAME, adapter),
  499. GATT_MANAGER_IFACE)
  500. app = Application(bus)
  501. mainloop = GObject.MainLoop()
  502. print('Registering GATT application...')
  503. service_manager.RegisterApplication(app.get_path(), {},
  504. reply_handler=register_app_cb,
  505. error_handler=register_app_error_cb)
  506. mainloop.run()
  507. if __name__ == '__main__':
  508. main()