| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- #!/usr/bin/python
- # SPDX-License-Identifier: LGPL-2.1-or-later
- from __future__ import absolute_import, print_function, unicode_literals
- from optparse import OptionParser, make_option
- import os
- from socket import SOCK_SEQPACKET, socket
- import sys
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- import glib
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- mainloop = None
- audio_supported = True
- try:
- from socket import AF_BLUETOOTH, BTPROTO_SCO
- except:
- print("WARNING: python compiled without Bluetooth support"
- " - audio will not be available")
- audio_supported = False
- BUF_SIZE = 1024
- BDADDR_ANY = '00:00:00:00:00:00'
- HF_NREC = 0x0001
- HF_3WAY = 0x0002
- HF_CLI = 0x0004
- HF_VOICE_RECOGNITION = 0x0008
- HF_REMOTE_VOL = 0x0010
- HF_ENHANCED_STATUS = 0x0020
- HF_ENHANCED_CONTROL = 0x0040
- HF_CODEC_NEGOTIATION = 0x0080
- AG_3WAY = 0x0001
- AG_NREC = 0x0002
- AG_VOICE_RECOGNITION = 0x0004
- AG_INBAND_RING = 0x0008
- AG_VOICE_TAG = 0x0010
- AG_REJECT_CALL = 0x0020
- AG_ENHANCED_STATUS = 0x0040
- AG_ENHANCED_CONTROL = 0x0080
- AG_EXTENDED_RESULT = 0x0100
- AG_CODEC_NEGOTIATION = 0x0200
- HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
- HF_REMOTE_VOL | HF_ENHANCED_STATUS |
- HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
- AVAIL_CODECS = "1,2"
- class HfpConnection:
- slc_complete = False
- fd = None
- io_id = 0
- version = 0
- features = 0
- pending = None
- def disconnect(self):
- if (self.fd >= 0):
- os.close(self.fd)
- self.fd = -1
- glib.source_remove(self.io_id)
- self.io_id = 0
- def slc_completed(self):
- print("SLC establisment complete")
- self.slc_complete = True
- def slc_next_cmd(self, cmd):
- if not cmd:
- self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
- elif (cmd.startswith("AT+BRSF")):
- if (self.features & AG_CODEC_NEGOTIATION and
- HF_FEATURES & HF_CODEC_NEGOTIATION):
- self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
- else:
- self.send_cmd("AT+CIND=?")
- elif (cmd.startswith("AT+BAC")):
- self.send_cmd("AT+CIND=?")
- elif (cmd.startswith("AT+CIND=?")):
- self.send_cmd("AT+CIND?")
- elif (cmd.startswith("AT+CIND?")):
- self.send_cmd("AT+CMER=3,0,0,1")
- elif (cmd.startswith("AT+CMER=")):
- if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
- self.send_cmd("AT+CHLD=?")
- else:
- self.slc_completed()
- elif (cmd.startswith("AT+CHLD=?")):
- self.slc_completed()
- else:
- print("Unknown SLC command completed: %s" % (cmd))
- def io_cb(self, fd, cond):
- buf = os.read(fd, BUF_SIZE)
- buf = buf.strip()
- print("Received: %s" % (buf))
- if (buf == "OK" or buf == "ERROR"):
- cmd = self.pending
- self.pending = None
- if (not self.slc_complete):
- self.slc_next_cmd(cmd)
- return True
- parts = buf.split(':')
- if (parts[0] == "+BRSF"):
- self.features = int(parts[1])
- return True
- def send_cmd(self, cmd):
- if (self.pending):
- print("ERROR: Another command is pending")
- return
- print("Sending: %s" % (cmd))
- os.write(self.fd, cmd + "\r\n")
- self.pending = cmd
- def __init__(self, fd, version, features):
- self.fd = fd
- self.version = version
- self.features = features
- print("Version 0x%04x Features 0x%04x" % (version, features))
- self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
- self.slc_next_cmd(None)
- class HfpProfile(dbus.service.Object):
- sco_socket = None
- io_id = 0
- conns = {}
- def sco_cb(self, sock, cond):
- (sco, peer) = sock.accept()
- print("New SCO connection from %s" % (peer))
- def init_sco(self, sock):
- self.sco_socket = sock
- self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
- def __init__(self, bus, path, sco):
- dbus.service.Object.__init__(self, bus, path)
- if sco:
- self.init_sco(sco)
- @dbus.service.method("org.bluez.Profile1",
- in_signature="", out_signature="")
- def Release(self):
- print("Release")
- mainloop.quit()
- @dbus.service.method("org.bluez.Profile1",
- in_signature="", out_signature="")
- def Cancel(self):
- print("Cancel")
- @dbus.service.method("org.bluez.Profile1",
- in_signature="o", out_signature="")
- def RequestDisconnection(self, path):
- conn = self.conns.pop(path)
- conn.disconnect()
- @dbus.service.method("org.bluez.Profile1",
- in_signature="oha{sv}", out_signature="")
- def NewConnection(self, path, fd, properties):
- fd = fd.take()
- version = 0x0105
- features = 0
- print("NewConnection(%s, %d)" % (path, fd))
- for key in properties.keys():
- if key == "Version":
- version = properties[key]
- elif key == "Features":
- features = properties[key]
- conn = HfpConnection(fd, version, features)
- self.conns[path] = conn
- if __name__ == '__main__':
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- manager = dbus.Interface(bus.get_object("org.bluez",
- "/org/bluez"), "org.bluez.ProfileManager1")
- option_list = [
- make_option("-p", "--path", action="store",
- type="string", dest="path",
- default="/bluez/test/hfp"),
- make_option("-n", "--name", action="store",
- type="string", dest="name",
- default=None),
- make_option("-C", "--channel", action="store",
- type="int", dest="channel",
- default=None),
- ]
- parser = OptionParser(option_list=option_list)
- (options, args) = parser.parse_args()
- mainloop = GObject.MainLoop()
- opts = {
- "Version" : dbus.UInt16(0x0106),
- "Features" : dbus.UInt16(HF_FEATURES),
- }
- if (options.name):
- opts["Name"] = options.name
- if (options.channel is not None):
- opts["Channel"] = dbus.UInt16(options.channel)
- if audio_supported:
- sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
- sco.bind(BDADDR_ANY)
- sco.listen(1)
- else:
- sco = None
- profile = HfpProfile(bus, options.path, sco)
- manager.RegisterProfile(options.path, "hfp-hf", opts)
- print("Profile registered - waiting for connections")
- mainloop.run()
|