opp-client 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/python
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. from optparse import OptionParser
  5. import os.path
  6. import sys
  7. import dbus
  8. import dbus.mainloop.glib
  9. try:
  10. from gi.repository import GObject
  11. except ImportError:
  12. import gobject as GObject
  13. BUS_NAME='org.bluez.obex'
  14. PATH = '/org/bluez/obex'
  15. CLIENT_INTERFACE='org.bluez.obex.Client1'
  16. SESSION_INTERFACE='org.bluez.obex.Session1'
  17. OBJECT_PUSH_INTERFACE='org.bluez.obex.ObjectPush1'
  18. TRANSFER_INTERFACE='org.bluez.obex.Transfer1'
  19. def parse_options():
  20. parser.add_option("-d", "--device", dest="device",
  21. help="Device to connect", metavar="DEVICE")
  22. parser.add_option("-p", "--pull", dest="pull_to_file",
  23. help="Pull vcard and store in FILE", metavar="FILE")
  24. parser.add_option("-s", "--send", dest="send_file",
  25. help="Send FILE", metavar="FILE")
  26. parser.add_option("-v", "--verbose", action="store_true",
  27. dest="verbose")
  28. return parser.parse_args()
  29. class OppClient:
  30. def __init__(self, session_path, verbose=False):
  31. self.transferred = 0
  32. self.transfer_path = None
  33. self.verbose = verbose
  34. bus = dbus.SessionBus()
  35. obj = bus.get_object(BUS_NAME, session_path)
  36. self.session = dbus.Interface(obj, SESSION_INTERFACE)
  37. self.opp = dbus.Interface(obj, OBJECT_PUSH_INTERFACE)
  38. bus.add_signal_receiver(self.properties_changed,
  39. dbus_interface="org.freedesktop.DBus.Properties",
  40. signal_name="PropertiesChanged",
  41. path_keyword="path")
  42. def create_transfer_reply(self, path, properties):
  43. self.transfer_path = path
  44. self.transfer_size = properties["Size"]
  45. if self.verbose:
  46. print("Transfer created: %s" % path)
  47. def error(self, err):
  48. print(err)
  49. mainloop.quit()
  50. def properties_changed(self, interface, properties, invalidated, path):
  51. if path != self.transfer_path:
  52. return
  53. if "Status" in properties and \
  54. (properties["Status"] == "complete" or \
  55. properties["Status"] == "error"):
  56. if self.verbose:
  57. print("Transfer %s" % properties["Status"])
  58. mainloop.quit()
  59. return
  60. if "Transferred" not in properties:
  61. return
  62. value = properties["Transferred"]
  63. speed = (value - self.transferred) / 1000
  64. print("Transfer progress %d/%d at %d kBps" % (value,
  65. self.transfer_size,
  66. speed))
  67. self.transferred = value
  68. def pull_business_card(self, filename):
  69. self.opp.PullBusinessCard(os.path.abspath(filename),
  70. reply_handler=self.create_transfer_reply,
  71. error_handler=self.error)
  72. def send_file(self, filename):
  73. self.opp.SendFile(os.path.abspath(filename),
  74. reply_handler=self.create_transfer_reply,
  75. error_handler=self.error)
  76. if __name__ == '__main__':
  77. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  78. parser = OptionParser()
  79. (options, args) = parse_options()
  80. if not options.device:
  81. parser.print_help()
  82. sys.exit(0)
  83. bus = dbus.SessionBus()
  84. mainloop = GObject.MainLoop()
  85. client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
  86. CLIENT_INTERFACE)
  87. print("Creating Session")
  88. path = client.CreateSession(options.device, { "Target": "OPP" })
  89. opp_client = OppClient(path, options.verbose)
  90. if options.pull_to_file:
  91. opp_client.pull_business_card(options.pull_to_file)
  92. if options.send_file:
  93. opp_client.send_file(options.send_file)
  94. mainloop.run()