example-gatt-client 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/env python3
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. import dbus
  4. try:
  5. from gi.repository import GObject
  6. except ImportError:
  7. import gobject as GObject
  8. import sys
  9. from dbus.mainloop.glib import DBusGMainLoop
  10. bus = None
  11. mainloop = None
  12. BLUEZ_SERVICE_NAME = 'org.bluez'
  13. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  14. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  15. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  16. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  17. HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
  18. HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
  19. BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
  20. HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
  21. # The objects that we interact with.
  22. hr_service = None
  23. hr_msrmt_chrc = None
  24. body_snsr_loc_chrc = None
  25. hr_ctrl_pt_chrc = None
  26. def generic_error_cb(error):
  27. print('D-Bus call failed: ' + str(error))
  28. mainloop.quit()
  29. def body_sensor_val_to_str(val):
  30. if val == 0:
  31. return 'Other'
  32. if val == 1:
  33. return 'Chest'
  34. if val == 2:
  35. return 'Wrist'
  36. if val == 3:
  37. return 'Finger'
  38. if val == 4:
  39. return 'Hand'
  40. if val == 5:
  41. return 'Ear Lobe'
  42. if val == 6:
  43. return 'Foot'
  44. return 'Reserved value'
  45. def sensor_contact_val_to_str(val):
  46. if val == 0 or val == 1:
  47. return 'not supported'
  48. if val == 2:
  49. return 'no contact detected'
  50. if val == 3:
  51. return 'contact detected'
  52. return 'invalid value'
  53. def body_sensor_val_cb(value):
  54. if len(value) != 1:
  55. print('Invalid body sensor location value: ' + repr(value))
  56. return
  57. print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
  58. def hr_msrmt_start_notify_cb():
  59. print('HR Measurement notifications enabled')
  60. def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
  61. if iface != GATT_CHRC_IFACE:
  62. return
  63. if not len(changed_props):
  64. return
  65. value = changed_props.get('Value', None)
  66. if not value:
  67. return
  68. print('New HR Measurement')
  69. flags = value[0]
  70. value_format = flags & 0x01
  71. sc_status = (flags >> 1) & 0x03
  72. ee_status = flags & 0x08
  73. if value_format == 0x00:
  74. hr_msrmt = value[1]
  75. next_ind = 2
  76. else:
  77. hr_msrmt = value[1] | (value[2] << 8)
  78. next_ind = 3
  79. print('\tHR: ' + str(int(hr_msrmt)))
  80. print('\tSensor Contact status: ' +
  81. sensor_contact_val_to_str(sc_status))
  82. if ee_status:
  83. print('\tEnergy Expended: ' + str(int(value[next_ind])))
  84. def start_client():
  85. # Read the Body Sensor Location value and print it asynchronously.
  86. body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,
  87. error_handler=generic_error_cb,
  88. dbus_interface=GATT_CHRC_IFACE)
  89. # Listen to PropertiesChanged signals from the Heart Measurement
  90. # Characteristic.
  91. hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
  92. hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
  93. hr_msrmt_changed_cb)
  94. # Subscribe to Heart Rate Measurement notifications.
  95. hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
  96. error_handler=generic_error_cb,
  97. dbus_interface=GATT_CHRC_IFACE)
  98. def process_chrc(chrc_path):
  99. chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
  100. chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
  101. dbus_interface=DBUS_PROP_IFACE)
  102. uuid = chrc_props['UUID']
  103. if uuid == HR_MSRMT_UUID:
  104. global hr_msrmt_chrc
  105. hr_msrmt_chrc = (chrc, chrc_props)
  106. elif uuid == BODY_SNSR_LOC_UUID:
  107. global body_snsr_loc_chrc
  108. body_snsr_loc_chrc = (chrc, chrc_props)
  109. elif uuid == HR_CTRL_PT_UUID:
  110. global hr_ctrl_pt_chrc
  111. hr_ctrl_pt_chrc = (chrc, chrc_props)
  112. else:
  113. print('Unrecognized characteristic: ' + uuid)
  114. return True
  115. def process_hr_service(service_path, chrc_paths):
  116. service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
  117. service_props = service.GetAll(GATT_SERVICE_IFACE,
  118. dbus_interface=DBUS_PROP_IFACE)
  119. uuid = service_props['UUID']
  120. if uuid != HR_SVC_UUID:
  121. return False
  122. print('Heart Rate Service found: ' + service_path)
  123. # Process the characteristics.
  124. for chrc_path in chrc_paths:
  125. process_chrc(chrc_path)
  126. global hr_service
  127. hr_service = (service, service_props, service_path)
  128. return True
  129. def interfaces_removed_cb(object_path, interfaces):
  130. if not hr_service:
  131. return
  132. if object_path == hr_service[2]:
  133. print('Service was removed')
  134. mainloop.quit()
  135. def main():
  136. # Set up the main loop.
  137. DBusGMainLoop(set_as_default=True)
  138. global bus
  139. bus = dbus.SystemBus()
  140. global mainloop
  141. mainloop = GObject.MainLoop()
  142. om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
  143. om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
  144. print('Getting objects...')
  145. objects = om.GetManagedObjects()
  146. chrcs = []
  147. # List characteristics found
  148. for path, interfaces in objects.items():
  149. if GATT_CHRC_IFACE not in interfaces.keys():
  150. continue
  151. chrcs.append(path)
  152. # List sevices found
  153. for path, interfaces in objects.items():
  154. if GATT_SERVICE_IFACE not in interfaces.keys():
  155. continue
  156. chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
  157. if process_hr_service(path, chrc_paths):
  158. break
  159. if not hr_service:
  160. print('No Heart Rate Service found')
  161. sys.exit(1)
  162. start_client()
  163. mainloop.run()
  164. if __name__ == '__main__':
  165. main()