test-health 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. #!/usr/bin/python
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. # -*- coding: utf-8 -*-
  5. import sys
  6. import dbus
  7. import dbus.service
  8. from dbus.mainloop.glib import DBusGMainLoop
  9. try:
  10. from gi.repository import GObject
  11. except ImportError:
  12. import gobject as GObject
  13. BUS_NAME = 'org.bluez'
  14. PATH = '/org/bluez'
  15. ADAPTER_INTERFACE = 'org.bluez.Adapter1'
  16. HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1'
  17. HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1'
  18. DBusGMainLoop(set_as_default=True)
  19. loop = GObject.MainLoop()
  20. bus = dbus.SystemBus()
  21. def sig_received(*args, **kwargs):
  22. if "member" not in kwargs:
  23. return
  24. if "path" not in kwargs:
  25. return;
  26. sig_name = kwargs["member"]
  27. path = kwargs["path"]
  28. print(sig_name)
  29. print(path)
  30. if sig_name == "PropertyChanged":
  31. k, v = args
  32. print(k)
  33. print(v)
  34. else:
  35. ob = args[0]
  36. print(ob)
  37. def enter_mainloop():
  38. bus.add_signal_receiver(sig_received, bus_name=BUS_NAME,
  39. dbus_interface=HEALTH_DEVICE_INTERFACE,
  40. path_keyword="path",
  41. member_keyword="member",
  42. interface_keyword="interface")
  43. try:
  44. print("Entering main lopp, push Ctrl+C for finish")
  45. mainloop = GObject.MainLoop()
  46. mainloop.run()
  47. except KeyboardInterrupt:
  48. pass
  49. finally:
  50. print("Exiting, bye")
  51. hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
  52. HEALTH_MANAGER_INTERFACE)
  53. role = None
  54. while role == None:
  55. print("Select 1. source or 2. sink: ",)
  56. try:
  57. sel = int(sys.stdin.readline())
  58. if sel == 1:
  59. role = "source"
  60. elif sel == 2:
  61. role = "sink"
  62. else:
  63. raise ValueError
  64. except (TypeError, ValueError):
  65. print("Wrong selection, try again: ",)
  66. except KeyboardInterrupt:
  67. sys.exit()
  68. dtype = None
  69. while dtype == None:
  70. print("Select a data type: ",)
  71. try:
  72. sel = int(sys.stdin.readline())
  73. if (sel < 0) or (sel > 65535):
  74. raise ValueError
  75. dtype = sel;
  76. except (TypeError, ValueError):
  77. print("Wrong selection, try again: ",)
  78. except KeyboardInterrupt:
  79. sys.exit()
  80. pref = None
  81. if role == "source":
  82. while pref == None:
  83. try:
  84. print("Select a preferred data channel type 1.",)
  85. print("reliable 2. streaming: ",)
  86. sel = int(sys.stdin.readline())
  87. if sel == 1:
  88. pref = "reliable"
  89. elif sel == 2:
  90. pref = "streaming"
  91. else:
  92. raise ValueError
  93. except (TypeError, ValueError):
  94. print("Wrong selection, try again")
  95. except KeyboardInterrupt:
  96. sys.exit()
  97. app_path = hdp_manager.CreateApplication({
  98. "DataType": dbus.types.UInt16(dtype),
  99. "Role": role,
  100. "Description": "Test Source",
  101. "ChannelType": pref})
  102. else:
  103. app_path = hdp_manager.CreateApplication({
  104. "DataType": dbus.types.UInt16(dtype),
  105. "Description": "Test sink",
  106. "Role": role})
  107. print("New application created:", app_path)
  108. con = None
  109. while con == None:
  110. try:
  111. print("Connect to a remote device (y/n)? ",)
  112. sel = sys.stdin.readline()
  113. if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
  114. con = True
  115. elif sel in ("n\n", "no\n", "N\n", "NO\n"):
  116. con = False
  117. else:
  118. print("Wrong selection, try again.")
  119. except KeyboardInterrupt:
  120. sys.exit()
  121. if not con:
  122. enter_mainloop()
  123. sys.exit()
  124. manager = dbus.Interface(bus.get_object(BUS_NAME, "/"),
  125. "org.freedesktop.DBus.ObjectManager")
  126. objects = manager.GetManagedObjects()
  127. adapters = []
  128. for path, ifaces in objects.iteritems():
  129. if ifaces.has_key(ADAPTER_INTERFACE):
  130. adapters.append(path)
  131. i = 1
  132. for ad in adapters:
  133. print("%d. %s" % (i, ad))
  134. i = i + 1
  135. print("Select an adapter: ",)
  136. select = None
  137. while select == None:
  138. try:
  139. pos = int(sys.stdin.readline()) - 1
  140. if pos < 0:
  141. raise TypeError
  142. select = adapters[pos]
  143. except (TypeError, IndexError, ValueError):
  144. print("Wrong selection, try again: ",)
  145. except KeyboardInterrupt:
  146. sys.exit()
  147. adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE)
  148. devices = []
  149. for path, interfaces in objects.iteritems():
  150. if "org.bluez.Device1" not in interfaces:
  151. continue
  152. properties = interfaces["org.bluez.Device1"]
  153. if properties["Adapter"] != select:
  154. continue;
  155. if HEALTH_DEVICE_INTERFACE not in interfaces:
  156. continue
  157. devices.append(path)
  158. if len(devices) == 0:
  159. print("No devices available")
  160. sys.exit()
  161. i = 1
  162. for dev in devices:
  163. print("%d. %s" % (i, dev))
  164. i = i + 1
  165. print("Select a device: ",)
  166. select = None
  167. while select == None:
  168. try:
  169. pos = int(sys.stdin.readline()) - 1
  170. if pos < 0:
  171. raise TypeError
  172. select = devices[pos]
  173. except (TypeError, IndexError, ValueError):
  174. print("Wrong selection, try again: ",)
  175. except KeyboardInterrupt:
  176. sys.exit()
  177. device = dbus.Interface(bus.get_object(BUS_NAME, select),
  178. HEALTH_DEVICE_INTERFACE)
  179. echo = None
  180. while echo == None:
  181. try:
  182. print("Perform an echo (y/n)? ",)
  183. sel = sys.stdin.readline()
  184. if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
  185. echo = True
  186. elif sel in ("n\n", "no\n", "N\n", "NO\n"):
  187. echo = False
  188. else:
  189. print("Wrong selection, try again.")
  190. except KeyboardInterrupt:
  191. sys.exit()
  192. if echo:
  193. if device.Echo():
  194. print("Echo was ok")
  195. else:
  196. print("Echo war wrong, exiting")
  197. sys.exit()
  198. print("Connecting to device %s" % (select))
  199. if role == "source":
  200. chan = device.CreateChannel(app_path, "reliable")
  201. else:
  202. chan = device.CreateChannel(app_path, "any")
  203. print(chan)
  204. enter_mainloop()
  205. hdp_manager.DestroyApplication(app_path)