| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/usr/bin/python
- # SPDX-License-Identifier: LGPL-2.1-or-later
- from __future__ import print_function
- import argparse
- import dbus
- import dbus.exceptions
- import dbus.mainloop.glib
- import dbus.service
- import time
- import threading
- try:
- from gi.repository import GObject # python3
- except ImportError:
- import gobject as GObject # python2
- mainloop = None
- BLUEZ_SERVICE_NAME = 'org.bluez'
- LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
- class InvalidArgsException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
- class NotSupportedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotSupported'
- class NotPermittedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotPermitted'
- class InvalidValueLengthException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
- class FailedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.Failed'
- class Advertisement(dbus.service.Object):
- PATH_BASE = '/org/bluez/example/advertisement'
- def __init__(self, bus, index, advertising_type):
- self.path = self.PATH_BASE + str(index)
- self.bus = bus
- self.ad_type = advertising_type
- self.service_uuids = None
- self.manufacturer_data = None
- self.solicit_uuids = None
- self.service_data = None
- self.local_name = None
- self.include_tx_power = False
- self.data = None
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- properties = dict()
- properties['Type'] = self.ad_type
- if self.service_uuids is not None:
- properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
- signature='s')
- if self.solicit_uuids is not None:
- properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
- signature='s')
- if self.manufacturer_data is not None:
- properties['ManufacturerData'] = dbus.Dictionary(
- self.manufacturer_data, signature='qv')
- if self.service_data is not None:
- properties['ServiceData'] = dbus.Dictionary(self.service_data,
- signature='sv')
- if self.local_name is not None:
- properties['LocalName'] = dbus.String(self.local_name)
- if self.include_tx_power:
- properties['Includes'] = dbus.Array(["tx-power"], signature='s')
- if self.data is not None:
- properties['Data'] = dbus.Dictionary(
- self.data, signature='yv')
- return {LE_ADVERTISEMENT_IFACE: properties}
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_service_uuid(self, uuid):
- if not self.service_uuids:
- self.service_uuids = []
- self.service_uuids.append(uuid)
- def add_solicit_uuid(self, uuid):
- if not self.solicit_uuids:
- self.solicit_uuids = []
- self.solicit_uuids.append(uuid)
- def add_manufacturer_data(self, manuf_code, data):
- if not self.manufacturer_data:
- self.manufacturer_data = dbus.Dictionary({}, signature='qv')
- self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
- def add_service_data(self, uuid, data):
- if not self.service_data:
- self.service_data = dbus.Dictionary({}, signature='sv')
- self.service_data[uuid] = dbus.Array(data, signature='y')
- def add_local_name(self, name):
- if not self.local_name:
- self.local_name = ""
- self.local_name = dbus.String(name)
- def add_data(self, ad_type, data):
- if not self.data:
- self.data = dbus.Dictionary({}, signature='yv')
- self.data[ad_type] = dbus.Array(data, signature='y')
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- print('GetAll')
- if interface != LE_ADVERTISEMENT_IFACE:
- raise InvalidArgsException()
- print('returning props')
- return self.get_properties()[LE_ADVERTISEMENT_IFACE]
- @dbus.service.method(LE_ADVERTISEMENT_IFACE,
- in_signature='',
- out_signature='')
- def Release(self):
- print('%s: Released!' % self.path)
- class TestAdvertisement(Advertisement):
- def __init__(self, bus, index):
- Advertisement.__init__(self, bus, index, 'peripheral')
- self.add_service_uuid('180D')
- self.add_service_uuid('180F')
- self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03])
- self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])
- self.add_local_name('TestAdvertisement')
- self.include_tx_power = True
- self.add_data(0x26, [0x01, 0x01, 0x00])
- def register_ad_cb():
- print('Advertisement registered')
- def register_ad_error_cb(error):
- print('Failed to register advertisement: ' + str(error))
- mainloop.quit()
- def find_adapter(bus):
- remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
- DBUS_OM_IFACE)
- objects = remote_om.GetManagedObjects()
- for o, props in objects.items():
- if LE_ADVERTISING_MANAGER_IFACE in props:
- return o
- return None
- def shutdown(timeout):
- print('Advertising for {} seconds...'.format(timeout))
- time.sleep(timeout)
- mainloop.quit()
- def main(timeout=0):
- global mainloop
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- adapter = find_adapter(bus)
- if not adapter:
- print('LEAdvertisingManager1 interface not found')
- return
- adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
- "org.freedesktop.DBus.Properties")
- adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
- ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
- LE_ADVERTISING_MANAGER_IFACE)
- test_advertisement = TestAdvertisement(bus, 0)
- mainloop = GObject.MainLoop()
- ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
- reply_handler=register_ad_cb,
- error_handler=register_ad_error_cb)
- if timeout > 0:
- threading.Thread(target=shutdown, args=(timeout,)).start()
- else:
- print('Advertising forever...')
- mainloop.run() # blocks until mainloop.quit() is called
- ad_manager.UnregisterAdvertisement(test_advertisement)
- print('Advertisement unregistered')
- dbus.service.Object.remove_from_connection(test_advertisement)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--timeout', default=0, type=int, help="advertise " +
- "for this many seconds then stop, 0=run forever " +
- "(default: 0)")
- args = parser.parse_args()
- main(args.timeout)
|