| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- #!/usr/bin/python
- # SPDX-License-Identifier: LGPL-2.1-or-later
- from __future__ import absolute_import, print_function, unicode_literals
- from optparse import OptionParser
- import sys
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- import bluezutils
- BUS_NAME = 'org.bluez'
- AGENT_INTERFACE = 'org.bluez.Agent1'
- AGENT_PATH = "/test/agent"
- bus = None
- device_obj = None
- dev_path = None
- def ask(prompt):
- try:
- return raw_input(prompt)
- except:
- return input(prompt)
- def set_trusted(path):
- props = dbus.Interface(bus.get_object("org.bluez", path),
- "org.freedesktop.DBus.Properties")
- props.Set("org.bluez.Device1", "Trusted", True)
- def dev_connect(path):
- dev = dbus.Interface(bus.get_object("org.bluez", path),
- "org.bluez.Device1")
- dev.Connect()
- class Rejected(dbus.DBusException):
- _dbus_error_name = "org.bluez.Error.Rejected"
- class Agent(dbus.service.Object):
- exit_on_release = True
- def set_exit_on_release(self, exit_on_release):
- self.exit_on_release = exit_on_release
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="", out_signature="")
- def Release(self):
- print("Release")
- if self.exit_on_release:
- mainloop.quit()
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="os", out_signature="")
- def AuthorizeService(self, device, uuid):
- print("AuthorizeService (%s, %s)" % (device, uuid))
- authorize = ask("Authorize connection (yes/no): ")
- if (authorize == "yes"):
- return
- raise Rejected("Connection rejected by user")
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="o", out_signature="s")
- def RequestPinCode(self, device):
- print("RequestPinCode (%s)" % (device))
- set_trusted(device)
- return ask("Enter PIN Code: ")
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="o", out_signature="u")
- def RequestPasskey(self, device):
- print("RequestPasskey (%s)" % (device))
- set_trusted(device)
- passkey = ask("Enter passkey: ")
- return dbus.UInt32(passkey)
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="ouq", out_signature="")
- def DisplayPasskey(self, device, passkey, entered):
- print("DisplayPasskey (%s, %06u entered %u)" %
- (device, passkey, entered))
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="os", out_signature="")
- def DisplayPinCode(self, device, pincode):
- print("DisplayPinCode (%s, %s)" % (device, pincode))
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="ou", out_signature="")
- def RequestConfirmation(self, device, passkey):
- print("RequestConfirmation (%s, %06d)" % (device, passkey))
- confirm = ask("Confirm passkey (yes/no): ")
- if (confirm == "yes"):
- set_trusted(device)
- return
- raise Rejected("Passkey doesn't match")
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="o", out_signature="")
- def RequestAuthorization(self, device):
- print("RequestAuthorization (%s)" % (device))
- auth = ask("Authorize? (yes/no): ")
- if (auth == "yes"):
- return
- raise Rejected("Pairing rejected")
- @dbus.service.method(AGENT_INTERFACE,
- in_signature="", out_signature="")
- def Cancel(self):
- print("Cancel")
- def pair_reply():
- print("Device paired")
- set_trusted(dev_path)
- dev_connect(dev_path)
- mainloop.quit()
- def pair_error(error):
- err_name = error.get_dbus_name()
- if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
- print("Timed out. Cancelling pairing")
- device_obj.CancelPairing()
- else:
- print("Creating device failed: %s" % (error))
- mainloop.quit()
- if __name__ == '__main__':
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- capability = "KeyboardDisplay"
- parser = OptionParser()
- parser.add_option("-i", "--adapter", action="store",
- type="string",
- dest="adapter_pattern",
- default=None)
- parser.add_option("-c", "--capability", action="store",
- type="string", dest="capability")
- parser.add_option("-t", "--timeout", action="store",
- type="int", dest="timeout",
- default=60000)
- (options, args) = parser.parse_args()
- if options.capability:
- capability = options.capability
- path = "/test/agent"
- agent = Agent(bus, path)
- mainloop = GObject.MainLoop()
- obj = bus.get_object(BUS_NAME, "/org/bluez");
- manager = dbus.Interface(obj, "org.bluez.AgentManager1")
- manager.RegisterAgent(path, capability)
- print("Agent registered")
- # Fix-up old style invocation (BlueZ 4)
- if len(args) > 0 and args[0].startswith("hci"):
- options.adapter_pattern = args[0]
- del args[:1]
- if len(args) > 0:
- device = bluezutils.find_device(args[0],
- options.adapter_pattern)
- dev_path = device.object_path
- agent.set_exit_on_release(False)
- device.Pair(reply_handler=pair_reply, error_handler=pair_error,
- timeout=60000)
- device_obj = device
- else:
- manager.RequestDefaultAgent(path)
- mainloop.run()
- #adapter.UnregisterAgent(path)
- #print("Agent unregistered")
|