test-discovery 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. #!/usr/bin/python
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. from optparse import OptionParser, make_option
  5. import dbus
  6. import dbus.mainloop.glib
  7. try:
  8. from gi.repository import GObject
  9. except ImportError:
  10. import gobject as GObject
  11. import bluezutils
  12. compact = False
  13. devices = {}
  14. def print_compact(address, properties):
  15. name = ""
  16. address = "<unknown>"
  17. for key, value in properties.iteritems():
  18. if type(value) is dbus.String:
  19. value = unicode(value).encode('ascii', 'replace')
  20. if (key == "Name"):
  21. name = value
  22. elif (key == "Address"):
  23. address = value
  24. if "Logged" in properties:
  25. flag = "*"
  26. else:
  27. flag = " "
  28. print("%s%s %s" % (flag, address, name))
  29. properties["Logged"] = True
  30. def print_normal(address, properties):
  31. print("[ " + address + " ]")
  32. for key in properties.keys():
  33. value = properties[key]
  34. if type(value) is dbus.String:
  35. value = unicode(value).encode('ascii', 'replace')
  36. if (key == "Class"):
  37. print(" %s = 0x%06x" % (key, value))
  38. else:
  39. print(" %s = %s" % (key, value))
  40. print()
  41. properties["Logged"] = True
  42. def skip_dev(old_dev, new_dev):
  43. if not "Logged" in old_dev:
  44. return False
  45. if "Name" in old_dev:
  46. return True
  47. if not "Name" in new_dev:
  48. return True
  49. return False
  50. def interfaces_added(path, interfaces):
  51. properties = interfaces["org.bluez.Device1"]
  52. if not properties:
  53. return
  54. if path in devices:
  55. dev = devices[path]
  56. if compact and skip_dev(dev, properties):
  57. return
  58. devices[path] = dict(devices[path].items() + properties.items())
  59. else:
  60. devices[path] = properties
  61. if "Address" in devices[path]:
  62. address = properties["Address"]
  63. else:
  64. address = "<unknown>"
  65. if compact:
  66. print_compact(address, devices[path])
  67. else:
  68. print_normal(address, devices[path])
  69. def properties_changed(interface, changed, invalidated, path):
  70. if interface != "org.bluez.Device1":
  71. return
  72. if path in devices:
  73. dev = devices[path]
  74. if compact and skip_dev(dev, changed):
  75. return
  76. devices[path] = dict(devices[path].items() + changed.items())
  77. else:
  78. devices[path] = changed
  79. if "Address" in devices[path]:
  80. address = devices[path]["Address"]
  81. else:
  82. address = "<unknown>"
  83. if compact:
  84. print_compact(address, devices[path])
  85. else:
  86. print_normal(address, devices[path])
  87. if __name__ == '__main__':
  88. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  89. bus = dbus.SystemBus()
  90. option_list = [
  91. make_option("-i", "--device", action="store",
  92. type="string", dest="dev_id"),
  93. make_option("-u", "--uuids", action="store",
  94. type="string", dest="uuids",
  95. help="Filtered service UUIDs [uuid1,uuid2,...]"),
  96. make_option("-r", "--rssi", action="store",
  97. type="int", dest="rssi",
  98. help="RSSI threshold value"),
  99. make_option("-p", "--pathloss", action="store",
  100. type="int", dest="pathloss",
  101. help="Pathloss threshold value"),
  102. make_option("-t", "--transport", action="store",
  103. type="string", dest="transport",
  104. help="Type of scan to run (le/bredr/auto)"),
  105. make_option("-c", "--compact",
  106. action="store_true", dest="compact"),
  107. ]
  108. parser = OptionParser(option_list=option_list)
  109. (options, args) = parser.parse_args()
  110. adapter = bluezutils.find_adapter(options.dev_id)
  111. if options.compact:
  112. compact = True;
  113. bus.add_signal_receiver(interfaces_added,
  114. dbus_interface = "org.freedesktop.DBus.ObjectManager",
  115. signal_name = "InterfacesAdded")
  116. bus.add_signal_receiver(properties_changed,
  117. dbus_interface = "org.freedesktop.DBus.Properties",
  118. signal_name = "PropertiesChanged",
  119. arg0 = "org.bluez.Device1",
  120. path_keyword = "path")
  121. om = dbus.Interface(bus.get_object("org.bluez", "/"),
  122. "org.freedesktop.DBus.ObjectManager")
  123. objects = om.GetManagedObjects()
  124. for path, interfaces in objects.iteritems():
  125. if "org.bluez.Device1" in interfaces:
  126. devices[path] = interfaces["org.bluez.Device1"]
  127. scan_filter = dict()
  128. if options.uuids:
  129. uuids = []
  130. uuid_list = options.uuids.split(',')
  131. for uuid in uuid_list:
  132. uuids.append(uuid)
  133. scan_filter.update({ "UUIDs": uuids })
  134. if options.rssi:
  135. scan_filter.update({ "RSSI": dbus.Int16(options.rssi) })
  136. if options.pathloss:
  137. scan_filter.update({ "Pathloss": dbus.UInt16(options.pathloss) })
  138. if options.transport:
  139. scan_filter.update({ "Transport": options.transport })
  140. adapter.SetDiscoveryFilter(scan_filter)
  141. adapter.StartDiscovery()
  142. mainloop = GObject.MainLoop()
  143. mainloop.run()