example-advertisement 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/python
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. from __future__ import print_function
  4. import argparse
  5. import dbus
  6. import dbus.exceptions
  7. import dbus.mainloop.glib
  8. import dbus.service
  9. import time
  10. import threading
  11. try:
  12. from gi.repository import GObject # python3
  13. except ImportError:
  14. import gobject as GObject # python2
  15. mainloop = None
  16. BLUEZ_SERVICE_NAME = 'org.bluez'
  17. LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
  18. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  19. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  20. LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
  21. class InvalidArgsException(dbus.exceptions.DBusException):
  22. _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
  23. class NotSupportedException(dbus.exceptions.DBusException):
  24. _dbus_error_name = 'org.bluez.Error.NotSupported'
  25. class NotPermittedException(dbus.exceptions.DBusException):
  26. _dbus_error_name = 'org.bluez.Error.NotPermitted'
  27. class InvalidValueLengthException(dbus.exceptions.DBusException):
  28. _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
  29. class FailedException(dbus.exceptions.DBusException):
  30. _dbus_error_name = 'org.bluez.Error.Failed'
  31. class Advertisement(dbus.service.Object):
  32. PATH_BASE = '/org/bluez/example/advertisement'
  33. def __init__(self, bus, index, advertising_type):
  34. self.path = self.PATH_BASE + str(index)
  35. self.bus = bus
  36. self.ad_type = advertising_type
  37. self.service_uuids = None
  38. self.manufacturer_data = None
  39. self.solicit_uuids = None
  40. self.service_data = None
  41. self.local_name = None
  42. self.include_tx_power = False
  43. self.data = None
  44. dbus.service.Object.__init__(self, bus, self.path)
  45. def get_properties(self):
  46. properties = dict()
  47. properties['Type'] = self.ad_type
  48. if self.service_uuids is not None:
  49. properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
  50. signature='s')
  51. if self.solicit_uuids is not None:
  52. properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
  53. signature='s')
  54. if self.manufacturer_data is not None:
  55. properties['ManufacturerData'] = dbus.Dictionary(
  56. self.manufacturer_data, signature='qv')
  57. if self.service_data is not None:
  58. properties['ServiceData'] = dbus.Dictionary(self.service_data,
  59. signature='sv')
  60. if self.local_name is not None:
  61. properties['LocalName'] = dbus.String(self.local_name)
  62. if self.include_tx_power:
  63. properties['Includes'] = dbus.Array(["tx-power"], signature='s')
  64. if self.data is not None:
  65. properties['Data'] = dbus.Dictionary(
  66. self.data, signature='yv')
  67. return {LE_ADVERTISEMENT_IFACE: properties}
  68. def get_path(self):
  69. return dbus.ObjectPath(self.path)
  70. def add_service_uuid(self, uuid):
  71. if not self.service_uuids:
  72. self.service_uuids = []
  73. self.service_uuids.append(uuid)
  74. def add_solicit_uuid(self, uuid):
  75. if not self.solicit_uuids:
  76. self.solicit_uuids = []
  77. self.solicit_uuids.append(uuid)
  78. def add_manufacturer_data(self, manuf_code, data):
  79. if not self.manufacturer_data:
  80. self.manufacturer_data = dbus.Dictionary({}, signature='qv')
  81. self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
  82. def add_service_data(self, uuid, data):
  83. if not self.service_data:
  84. self.service_data = dbus.Dictionary({}, signature='sv')
  85. self.service_data[uuid] = dbus.Array(data, signature='y')
  86. def add_local_name(self, name):
  87. if not self.local_name:
  88. self.local_name = ""
  89. self.local_name = dbus.String(name)
  90. def add_data(self, ad_type, data):
  91. if not self.data:
  92. self.data = dbus.Dictionary({}, signature='yv')
  93. self.data[ad_type] = dbus.Array(data, signature='y')
  94. @dbus.service.method(DBUS_PROP_IFACE,
  95. in_signature='s',
  96. out_signature='a{sv}')
  97. def GetAll(self, interface):
  98. print('GetAll')
  99. if interface != LE_ADVERTISEMENT_IFACE:
  100. raise InvalidArgsException()
  101. print('returning props')
  102. return self.get_properties()[LE_ADVERTISEMENT_IFACE]
  103. @dbus.service.method(LE_ADVERTISEMENT_IFACE,
  104. in_signature='',
  105. out_signature='')
  106. def Release(self):
  107. print('%s: Released!' % self.path)
  108. class TestAdvertisement(Advertisement):
  109. def __init__(self, bus, index):
  110. Advertisement.__init__(self, bus, index, 'peripheral')
  111. self.add_service_uuid('180D')
  112. self.add_service_uuid('180F')
  113. self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03])
  114. self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])
  115. self.add_local_name('TestAdvertisement')
  116. self.include_tx_power = True
  117. self.add_data(0x26, [0x01, 0x01, 0x00])
  118. def register_ad_cb():
  119. print('Advertisement registered')
  120. def register_ad_error_cb(error):
  121. print('Failed to register advertisement: ' + str(error))
  122. mainloop.quit()
  123. def find_adapter(bus):
  124. remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
  125. DBUS_OM_IFACE)
  126. objects = remote_om.GetManagedObjects()
  127. for o, props in objects.items():
  128. if LE_ADVERTISING_MANAGER_IFACE in props:
  129. return o
  130. return None
  131. def shutdown(timeout):
  132. print('Advertising for {} seconds...'.format(timeout))
  133. time.sleep(timeout)
  134. mainloop.quit()
  135. def main(timeout=0):
  136. global mainloop
  137. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  138. bus = dbus.SystemBus()
  139. adapter = find_adapter(bus)
  140. if not adapter:
  141. print('LEAdvertisingManager1 interface not found')
  142. return
  143. adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
  144. "org.freedesktop.DBus.Properties")
  145. adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
  146. ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
  147. LE_ADVERTISING_MANAGER_IFACE)
  148. test_advertisement = TestAdvertisement(bus, 0)
  149. mainloop = GObject.MainLoop()
  150. ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
  151. reply_handler=register_ad_cb,
  152. error_handler=register_ad_error_cb)
  153. if timeout > 0:
  154. threading.Thread(target=shutdown, args=(timeout,)).start()
  155. else:
  156. print('Advertising forever...')
  157. mainloop.run() # blocks until mainloop.quit() is called
  158. ad_manager.UnregisterAdvertisement(test_advertisement)
  159. print('Advertisement unregistered')
  160. dbus.service.Object.remove_from_connection(test_advertisement)
  161. if __name__ == '__main__':
  162. parser = argparse.ArgumentParser()
  163. parser.add_argument('--timeout', default=0, type=int, help="advertise " +
  164. "for this many seconds then stop, 0=run forever " +
  165. "(default: 0)")
  166. args = parser.parse_args()
  167. main(args.timeout)