| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- #!/usr/bin/python
- # SPDX-License-Identifier: LGPL-2.1-or-later
- from __future__ import absolute_import, print_function, unicode_literals
- from optparse import OptionParser
- import os.path
- import sys
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- BUS_NAME='org.bluez.obex'
- PATH = '/org/bluez/obex'
- CLIENT_INTERFACE='org.bluez.obex.Client1'
- SESSION_INTERFACE='org.bluez.obex.Session1'
- FILE_TRASNFER_INTERFACE='org.bluez.obex.FileTransfer1'
- TRANSFER_INTERFACE='org.bluez.obex.Transfer1'
- def parse_options():
- parser.add_option("-d", "--device", dest="device",
- help="Device to connect", metavar="DEVICE")
- parser.add_option("-c", "--chdir", dest="new_dir",
- help="Change current directory to DIR", metavar="DIR")
- parser.add_option("-l", "--list", action="store_true", dest="list_dir",
- help="List the current directory")
- parser.add_option("-g", "--get", dest="get_file",
- help="Get FILE", metavar="FILE")
- parser.add_option("-p", "--put", dest="put_file",
- help="Put FILE", metavar="FILE")
- parser.add_option("-y", "--copy", dest="copy_file",
- help="Copy FILE", metavar="FILE")
- parser.add_option("-m", "--move", dest="move_file",
- help="Move FILE", metavar="FILE")
- parser.add_option("-n", "--destname", dest="dest_file",
- help="Destination FILE", metavar="FILE")
- parser.add_option("-r", "--remove", dest="remove_file",
- help="Remove FILE", metavar="FILE")
- parser.add_option("-v", "--verbose", action="store_true",
- dest="verbose")
- return parser.parse_args()
- class FtpClient:
- def __init__(self, session_path, verbose=False):
- self.transferred = 0
- self.transfer_path = None
- self.transfer_size = 0
- self.verbose = verbose
- bus = dbus.SessionBus()
- obj = bus.get_object(BUS_NAME, session_path)
- self.session = dbus.Interface(obj, SESSION_INTERFACE)
- self.ftp = dbus.Interface(obj, FILE_TRASNFER_INTERFACE)
- bus.add_signal_receiver(self.properties_changed,
- dbus_interface="org.freedesktop.DBus.Properties",
- signal_name="PropertiesChanged",
- path_keyword="path")
- def create_transfer_reply(self, path, properties):
- self.transfer_path = path
- self.transfer_size = properties["Size"]
- if self.verbose:
- print("Transfer created: %s" % path)
- def generic_reply(self):
- if self.verbose:
- print("Operation succeeded")
- def error(self, err):
- print(err)
- mainloop.quit()
- def properties_changed(self, interface, properties, invalidated, path):
- if path != self.transfer_path:
- return
- if "Status" in properties and \
- (properties['Status'] == 'complete' or \
- properties['Status'] == 'error'):
- if self.verbose:
- print("Transfer %s" % properties['Status'])
- mainloop.quit()
- return
- if "Transferred" not in properties:
- return
- value = properties["Transferred"]
- speed = (value - self.transferred) / 1000
- print("Transfer progress %d/%d at %d kBps" % (value,
- self.transfer_size,
- speed))
- self.transferred = value
- def change_folder(self, new_dir):
- for node in new_dir.split("/"):
- self.ftp.ChangeFolder(node)
- def list_folder(self):
- for i in self.ftp.ListFolder():
- if i["Type"] == "folder":
- print("%s/" % (i["Name"]))
- else:
- print("%s" % (i["Name"]))
- def put_file(self, filename):
- self.ftp.PutFile(os.path.abspath(filename),
- os.path.basename(filename),
- reply_handler=self.create_transfer_reply,
- error_handler=self.error)
- def get_file(self, filename):
- self.ftp.GetFile(os.path.abspath(filename),
- os.path.basename(filename),
- reply_handler=self.create_transfer_reply,
- error_handler=self.error)
- def remove_file(self, filename):
- self.ftp.Delete(filename,
- reply_handler=self.generic_reply,
- error_handler=self.error)
- def move_file(self, filename, destname):
- self.ftp.MoveFile(filename, destname,
- reply_handler=self.generic_reply,
- error_handler=self.error)
- def copy_file(self, filename, destname):
- self.ftp.CopyFile(filename, destname,
- reply_handler=self.generic_reply,
- error_handler=self.error)
- if __name__ == '__main__':
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- parser = OptionParser()
- (options, args) = parse_options()
- if not options.device:
- parser.print_help()
- sys.exit(0)
- bus = dbus.SessionBus()
- mainloop = GObject.MainLoop()
- client = dbus.Interface(bus.get_object(BUS_NAME, PATH,),
- CLIENT_INTERFACE)
- print("Creating Session")
- path = client.CreateSession(options.device, { "Target": "ftp" })
- ftp_client = FtpClient(path, options.verbose)
- if options.new_dir:
- ftp_client.change_folder(options.new_dir)
- if options.list_dir:
- ftp_client.list_folder()
- if options.get_file:
- ftp_client.get_file(options.get_file)
- if options.put_file:
- ftp_client.put_file(options.put_file)
- if options.move_file:
- ftp_client.move_file(options.move_file, options.dest_file)
- if options.copy_file:
- ftp_client.copy_file(options.copy_file, options.dest_file)
- if options.remove_file:
- ftp_client.remove_file(options.remove_file)
- mainloop.run()
|