test-hfp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. #!/usr/bin/python
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. from optparse import OptionParser, make_option
  5. import os
  6. from socket import SOCK_SEQPACKET, socket
  7. import sys
  8. import dbus
  9. import dbus.service
  10. import dbus.mainloop.glib
  11. import glib
  12. try:
  13. from gi.repository import GObject
  14. except ImportError:
  15. import gobject as GObject
  16. mainloop = None
  17. audio_supported = True
  18. try:
  19. from socket import AF_BLUETOOTH, BTPROTO_SCO
  20. except:
  21. print("WARNING: python compiled without Bluetooth support"
  22. " - audio will not be available")
  23. audio_supported = False
  24. BUF_SIZE = 1024
  25. BDADDR_ANY = '00:00:00:00:00:00'
  26. HF_NREC = 0x0001
  27. HF_3WAY = 0x0002
  28. HF_CLI = 0x0004
  29. HF_VOICE_RECOGNITION = 0x0008
  30. HF_REMOTE_VOL = 0x0010
  31. HF_ENHANCED_STATUS = 0x0020
  32. HF_ENHANCED_CONTROL = 0x0040
  33. HF_CODEC_NEGOTIATION = 0x0080
  34. AG_3WAY = 0x0001
  35. AG_NREC = 0x0002
  36. AG_VOICE_RECOGNITION = 0x0004
  37. AG_INBAND_RING = 0x0008
  38. AG_VOICE_TAG = 0x0010
  39. AG_REJECT_CALL = 0x0020
  40. AG_ENHANCED_STATUS = 0x0040
  41. AG_ENHANCED_CONTROL = 0x0080
  42. AG_EXTENDED_RESULT = 0x0100
  43. AG_CODEC_NEGOTIATION = 0x0200
  44. HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
  45. HF_REMOTE_VOL | HF_ENHANCED_STATUS |
  46. HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
  47. AVAIL_CODECS = "1,2"
  48. class HfpConnection:
  49. slc_complete = False
  50. fd = None
  51. io_id = 0
  52. version = 0
  53. features = 0
  54. pending = None
  55. def disconnect(self):
  56. if (self.fd >= 0):
  57. os.close(self.fd)
  58. self.fd = -1
  59. glib.source_remove(self.io_id)
  60. self.io_id = 0
  61. def slc_completed(self):
  62. print("SLC establisment complete")
  63. self.slc_complete = True
  64. def slc_next_cmd(self, cmd):
  65. if not cmd:
  66. self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
  67. elif (cmd.startswith("AT+BRSF")):
  68. if (self.features & AG_CODEC_NEGOTIATION and
  69. HF_FEATURES & HF_CODEC_NEGOTIATION):
  70. self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
  71. else:
  72. self.send_cmd("AT+CIND=?")
  73. elif (cmd.startswith("AT+BAC")):
  74. self.send_cmd("AT+CIND=?")
  75. elif (cmd.startswith("AT+CIND=?")):
  76. self.send_cmd("AT+CIND?")
  77. elif (cmd.startswith("AT+CIND?")):
  78. self.send_cmd("AT+CMER=3,0,0,1")
  79. elif (cmd.startswith("AT+CMER=")):
  80. if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
  81. self.send_cmd("AT+CHLD=?")
  82. else:
  83. self.slc_completed()
  84. elif (cmd.startswith("AT+CHLD=?")):
  85. self.slc_completed()
  86. else:
  87. print("Unknown SLC command completed: %s" % (cmd))
  88. def io_cb(self, fd, cond):
  89. buf = os.read(fd, BUF_SIZE)
  90. buf = buf.strip()
  91. print("Received: %s" % (buf))
  92. if (buf == "OK" or buf == "ERROR"):
  93. cmd = self.pending
  94. self.pending = None
  95. if (not self.slc_complete):
  96. self.slc_next_cmd(cmd)
  97. return True
  98. parts = buf.split(':')
  99. if (parts[0] == "+BRSF"):
  100. self.features = int(parts[1])
  101. return True
  102. def send_cmd(self, cmd):
  103. if (self.pending):
  104. print("ERROR: Another command is pending")
  105. return
  106. print("Sending: %s" % (cmd))
  107. os.write(self.fd, cmd + "\r\n")
  108. self.pending = cmd
  109. def __init__(self, fd, version, features):
  110. self.fd = fd
  111. self.version = version
  112. self.features = features
  113. print("Version 0x%04x Features 0x%04x" % (version, features))
  114. self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
  115. self.slc_next_cmd(None)
  116. class HfpProfile(dbus.service.Object):
  117. sco_socket = None
  118. io_id = 0
  119. conns = {}
  120. def sco_cb(self, sock, cond):
  121. (sco, peer) = sock.accept()
  122. print("New SCO connection from %s" % (peer))
  123. def init_sco(self, sock):
  124. self.sco_socket = sock
  125. self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
  126. def __init__(self, bus, path, sco):
  127. dbus.service.Object.__init__(self, bus, path)
  128. if sco:
  129. self.init_sco(sco)
  130. @dbus.service.method("org.bluez.Profile1",
  131. in_signature="", out_signature="")
  132. def Release(self):
  133. print("Release")
  134. mainloop.quit()
  135. @dbus.service.method("org.bluez.Profile1",
  136. in_signature="", out_signature="")
  137. def Cancel(self):
  138. print("Cancel")
  139. @dbus.service.method("org.bluez.Profile1",
  140. in_signature="o", out_signature="")
  141. def RequestDisconnection(self, path):
  142. conn = self.conns.pop(path)
  143. conn.disconnect()
  144. @dbus.service.method("org.bluez.Profile1",
  145. in_signature="oha{sv}", out_signature="")
  146. def NewConnection(self, path, fd, properties):
  147. fd = fd.take()
  148. version = 0x0105
  149. features = 0
  150. print("NewConnection(%s, %d)" % (path, fd))
  151. for key in properties.keys():
  152. if key == "Version":
  153. version = properties[key]
  154. elif key == "Features":
  155. features = properties[key]
  156. conn = HfpConnection(fd, version, features)
  157. self.conns[path] = conn
  158. if __name__ == '__main__':
  159. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  160. bus = dbus.SystemBus()
  161. manager = dbus.Interface(bus.get_object("org.bluez",
  162. "/org/bluez"), "org.bluez.ProfileManager1")
  163. option_list = [
  164. make_option("-p", "--path", action="store",
  165. type="string", dest="path",
  166. default="/bluez/test/hfp"),
  167. make_option("-n", "--name", action="store",
  168. type="string", dest="name",
  169. default=None),
  170. make_option("-C", "--channel", action="store",
  171. type="int", dest="channel",
  172. default=None),
  173. ]
  174. parser = OptionParser(option_list=option_list)
  175. (options, args) = parser.parse_args()
  176. mainloop = GObject.MainLoop()
  177. opts = {
  178. "Version" : dbus.UInt16(0x0106),
  179. "Features" : dbus.UInt16(HF_FEATURES),
  180. }
  181. if (options.name):
  182. opts["Name"] = options.name
  183. if (options.channel is not None):
  184. opts["Channel"] = dbus.UInt16(options.channel)
  185. if audio_supported:
  186. sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
  187. sco.bind(BDADDR_ANY)
  188. sco.listen(1)
  189. else:
  190. sco = None
  191. profile = HfpProfile(bus, options.path, sco)
  192. manager.RegisterProfile(options.path, "hfp-hf", opts)
  193. print("Profile registered - waiting for connections")
  194. mainloop.run()