| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- #!/usr/bin/python
- # SPDX-License-Identifier: LGPL-2.1-or-later
- from __future__ import absolute_import, print_function, unicode_literals
- import os
- import sys
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- BUS_NAME='org.bluez.obex'
- PATH = '/org/bluez/obex'
- CLIENT_INTERFACE = 'org.bluez.obex.Client1'
- SESSION_INTERFACE = 'org.bluez.obex.Session1'
- PHONEBOOK_ACCESS_INTERFACE = 'org.bluez.obex.PhonebookAccess1'
- TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
- class Transfer:
- def __init__(self, callback_func):
- self.callback_func = callback_func
- self.path = None
- self.filename = None
- class PbapClient:
- def __init__(self, session_path):
- self.transfers = 0
- self.props = dict()
- self.flush_func = None
- bus = dbus.SessionBus()
- obj = bus.get_object(BUS_NAME, session_path)
- self.session = dbus.Interface(obj, SESSION_INTERFACE)
- self.pbap = dbus.Interface(obj, PHONEBOOK_ACCESS_INTERFACE)
- bus.add_signal_receiver(self.properties_changed,
- dbus_interface="org.freedesktop.DBus.Properties",
- signal_name="PropertiesChanged",
- path_keyword="path")
- def register(self, path, properties, transfer):
- transfer.path = path
- transfer.filename = properties["Filename"]
- self.props[path] = transfer
- print("Transfer created: %s (file %s)" % (path,
- transfer.filename))
- def error(self, err):
- print(err)
- mainloop.quit()
- def transfer_complete(self, path):
- req = self.props.get(path)
- if req == None:
- return
- self.transfers -= 1
- print("Transfer %s complete" % path)
- try:
- f = open(req.filename, "r")
- os.remove(req.filename)
- lines = f.readlines()
- del self.props[path]
- req.callback_func(lines)
- except:
- pass
- if (len(self.props) == 0) and (self.transfers == 0):
- if self.flush_func != None:
- f = self.flush_func
- self.flush_func = None
- f()
- def transfer_error(self, path):
- print("Transfer %s error" % path)
- mainloop.quit()
- def properties_changed(self, interface, properties, invalidated, path):
- req = self.props.get(path)
- if req == None:
- return
- if properties['Status'] == 'complete':
- self.transfer_complete(path)
- return
- if properties['Status'] == 'error':
- self.transfer_error(path)
- return
- def pull(self, vcard, params, func):
- req = Transfer(func)
- self.pbap.Pull(vcard, "", params,
- reply_handler=lambda o, p: self.register(o, p, req),
- error_handler=self.error)
- self.transfers += 1
- def pull_all(self, params, func):
- req = Transfer(func)
- self.pbap.PullAll("", params,
- reply_handler=lambda o, p: self.register(o, p, req),
- error_handler=self.error)
- self.transfers += 1
- def flush_transfers(self, func):
- if (len(self.props) == 0) and (self.transfers == 0):
- return
- self.flush_func = func
- def interface(self):
- return self.pbap
- if __name__ == '__main__':
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SessionBus()
- mainloop = GObject.MainLoop()
- client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
- CLIENT_INTERFACE)
- if (len(sys.argv) < 2):
- print("Usage: %s <device>" % (sys.argv[0]))
- sys.exit(1)
- print("Creating Session")
- session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
- pbap_client = PbapClient(session_path)
- def process_result(lines, header):
- if header != None:
- print(header)
- for line in lines:
- print(line),
- print
- def test_paths(paths):
- if len(paths) == 0:
- print
- print("FINISHED")
- mainloop.quit()
- return
- path = paths[0]
- print("\n--- Select Phonebook %s ---\n" % (path))
- pbap_client.interface().Select("int", path)
- print("\n--- GetSize ---\n")
- ret = pbap_client.interface().GetSize()
- print("Size = %d\n" % (ret))
- print("\n--- List vCard ---\n")
- try:
- ret = pbap_client.interface().List(dbus.Dictionary())
- except:
- ret = []
- params = dbus.Dictionary({ "Format" : "vcard30",
- "Fields" : ["PHOTO"] })
- for item in ret:
- print("%s : %s" % (item[0], item[1]))
- pbap_client.pull(item[0], params,
- lambda x: process_result(x, None))
- pbap_client.pull_all(params, lambda x: process_result(x,
- "\n--- PullAll ---\n"))
- pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
- test_paths(["PB", "ICH", "OCH", "MCH", "CCH", "SPD", "FAV"])
- mainloop.run()
|