| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/usr/bin/env python3
- # SPDX-License-Identifier: LGPL-2.1-or-later
- import dbus
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- import sys
- from dbus.mainloop.glib import DBusGMainLoop
- bus = None
- mainloop = None
- BLUEZ_SERVICE_NAME = 'org.bluez'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- GATT_SERVICE_IFACE = 'org.bluez.GattService1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
- HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
- BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
- HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
- # The objects that we interact with.
- hr_service = None
- hr_msrmt_chrc = None
- body_snsr_loc_chrc = None
- hr_ctrl_pt_chrc = None
- def generic_error_cb(error):
- print('D-Bus call failed: ' + str(error))
- mainloop.quit()
- def body_sensor_val_to_str(val):
- if val == 0:
- return 'Other'
- if val == 1:
- return 'Chest'
- if val == 2:
- return 'Wrist'
- if val == 3:
- return 'Finger'
- if val == 4:
- return 'Hand'
- if val == 5:
- return 'Ear Lobe'
- if val == 6:
- return 'Foot'
- return 'Reserved value'
- def sensor_contact_val_to_str(val):
- if val == 0 or val == 1:
- return 'not supported'
- if val == 2:
- return 'no contact detected'
- if val == 3:
- return 'contact detected'
- return 'invalid value'
- def body_sensor_val_cb(value):
- if len(value) != 1:
- print('Invalid body sensor location value: ' + repr(value))
- return
- print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
- def hr_msrmt_start_notify_cb():
- print('HR Measurement notifications enabled')
- def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
- if iface != GATT_CHRC_IFACE:
- return
- if not len(changed_props):
- return
- value = changed_props.get('Value', None)
- if not value:
- return
- print('New HR Measurement')
- flags = value[0]
- value_format = flags & 0x01
- sc_status = (flags >> 1) & 0x03
- ee_status = flags & 0x08
- if value_format == 0x00:
- hr_msrmt = value[1]
- next_ind = 2
- else:
- hr_msrmt = value[1] | (value[2] << 8)
- next_ind = 3
- print('\tHR: ' + str(int(hr_msrmt)))
- print('\tSensor Contact status: ' +
- sensor_contact_val_to_str(sc_status))
- if ee_status:
- print('\tEnergy Expended: ' + str(int(value[next_ind])))
- def start_client():
- # Read the Body Sensor Location value and print it asynchronously.
- body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- # Listen to PropertiesChanged signals from the Heart Measurement
- # Characteristic.
- hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
- hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
- hr_msrmt_changed_cb)
- # Subscribe to Heart Rate Measurement notifications.
- hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- def process_chrc(chrc_path):
- chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
- chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
- dbus_interface=DBUS_PROP_IFACE)
- uuid = chrc_props['UUID']
- if uuid == HR_MSRMT_UUID:
- global hr_msrmt_chrc
- hr_msrmt_chrc = (chrc, chrc_props)
- elif uuid == BODY_SNSR_LOC_UUID:
- global body_snsr_loc_chrc
- body_snsr_loc_chrc = (chrc, chrc_props)
- elif uuid == HR_CTRL_PT_UUID:
- global hr_ctrl_pt_chrc
- hr_ctrl_pt_chrc = (chrc, chrc_props)
- else:
- print('Unrecognized characteristic: ' + uuid)
- return True
- def process_hr_service(service_path, chrc_paths):
- service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
- service_props = service.GetAll(GATT_SERVICE_IFACE,
- dbus_interface=DBUS_PROP_IFACE)
- uuid = service_props['UUID']
- if uuid != HR_SVC_UUID:
- return False
- print('Heart Rate Service found: ' + service_path)
- # Process the characteristics.
- for chrc_path in chrc_paths:
- process_chrc(chrc_path)
- global hr_service
- hr_service = (service, service_props, service_path)
- return True
- def interfaces_removed_cb(object_path, interfaces):
- if not hr_service:
- return
- if object_path == hr_service[2]:
- print('Service was removed')
- mainloop.quit()
- def main():
- # Set up the main loop.
- DBusGMainLoop(set_as_default=True)
- global bus
- bus = dbus.SystemBus()
- global mainloop
- mainloop = GObject.MainLoop()
- om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
- print('Getting objects...')
- objects = om.GetManagedObjects()
- chrcs = []
- # List characteristics found
- for path, interfaces in objects.items():
- if GATT_CHRC_IFACE not in interfaces.keys():
- continue
- chrcs.append(path)
- # List sevices found
- for path, interfaces in objects.items():
- if GATT_SERVICE_IFACE not in interfaces.keys():
- continue
- chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
- if process_hr_service(path, chrc_paths):
- break
- if not hr_service:
- print('No Heart Rate Service found')
- sys.exit(1)
- start_client()
- mainloop.run()
- if __name__ == '__main__':
- main()
|