| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- #!/usr/bin/python
- # SPDX-License-Identifier: LGPL-2.1-or-later
- from __future__ import absolute_import, print_function, unicode_literals
- # -*- coding: utf-8 -*-
- import sys
- import dbus
- import dbus.service
- from dbus.mainloop.glib import DBusGMainLoop
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- BUS_NAME = 'org.bluez'
- PATH = '/org/bluez'
- ADAPTER_INTERFACE = 'org.bluez.Adapter1'
- HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1'
- HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1'
- DBusGMainLoop(set_as_default=True)
- loop = GObject.MainLoop()
- bus = dbus.SystemBus()
- def sig_received(*args, **kwargs):
- if "member" not in kwargs:
- return
- if "path" not in kwargs:
- return;
- sig_name = kwargs["member"]
- path = kwargs["path"]
- print(sig_name)
- print(path)
- if sig_name == "PropertyChanged":
- k, v = args
- print(k)
- print(v)
- else:
- ob = args[0]
- print(ob)
- def enter_mainloop():
- bus.add_signal_receiver(sig_received, bus_name=BUS_NAME,
- dbus_interface=HEALTH_DEVICE_INTERFACE,
- path_keyword="path",
- member_keyword="member",
- interface_keyword="interface")
- try:
- print("Entering main lopp, push Ctrl+C for finish")
- mainloop = GObject.MainLoop()
- mainloop.run()
- except KeyboardInterrupt:
- pass
- finally:
- print("Exiting, bye")
- hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
- HEALTH_MANAGER_INTERFACE)
- role = None
- while role == None:
- print("Select 1. source or 2. sink: ",)
- try:
- sel = int(sys.stdin.readline())
- if sel == 1:
- role = "source"
- elif sel == 2:
- role = "sink"
- else:
- raise ValueError
- except (TypeError, ValueError):
- print("Wrong selection, try again: ",)
- except KeyboardInterrupt:
- sys.exit()
- dtype = None
- while dtype == None:
- print("Select a data type: ",)
- try:
- sel = int(sys.stdin.readline())
- if (sel < 0) or (sel > 65535):
- raise ValueError
- dtype = sel;
- except (TypeError, ValueError):
- print("Wrong selection, try again: ",)
- except KeyboardInterrupt:
- sys.exit()
- pref = None
- if role == "source":
- while pref == None:
- try:
- print("Select a preferred data channel type 1.",)
- print("reliable 2. streaming: ",)
- sel = int(sys.stdin.readline())
- if sel == 1:
- pref = "reliable"
- elif sel == 2:
- pref = "streaming"
- else:
- raise ValueError
- except (TypeError, ValueError):
- print("Wrong selection, try again")
- except KeyboardInterrupt:
- sys.exit()
- app_path = hdp_manager.CreateApplication({
- "DataType": dbus.types.UInt16(dtype),
- "Role": role,
- "Description": "Test Source",
- "ChannelType": pref})
- else:
- app_path = hdp_manager.CreateApplication({
- "DataType": dbus.types.UInt16(dtype),
- "Description": "Test sink",
- "Role": role})
- print("New application created:", app_path)
- con = None
- while con == None:
- try:
- print("Connect to a remote device (y/n)? ",)
- sel = sys.stdin.readline()
- if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
- con = True
- elif sel in ("n\n", "no\n", "N\n", "NO\n"):
- con = False
- else:
- print("Wrong selection, try again.")
- except KeyboardInterrupt:
- sys.exit()
- if not con:
- enter_mainloop()
- sys.exit()
- manager = dbus.Interface(bus.get_object(BUS_NAME, "/"),
- "org.freedesktop.DBus.ObjectManager")
- objects = manager.GetManagedObjects()
- adapters = []
- for path, ifaces in objects.iteritems():
- if ifaces.has_key(ADAPTER_INTERFACE):
- adapters.append(path)
- i = 1
- for ad in adapters:
- print("%d. %s" % (i, ad))
- i = i + 1
- print("Select an adapter: ",)
- select = None
- while select == None:
- try:
- pos = int(sys.stdin.readline()) - 1
- if pos < 0:
- raise TypeError
- select = adapters[pos]
- except (TypeError, IndexError, ValueError):
- print("Wrong selection, try again: ",)
- except KeyboardInterrupt:
- sys.exit()
- adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE)
- devices = []
- for path, interfaces in objects.iteritems():
- if "org.bluez.Device1" not in interfaces:
- continue
- properties = interfaces["org.bluez.Device1"]
- if properties["Adapter"] != select:
- continue;
- if HEALTH_DEVICE_INTERFACE not in interfaces:
- continue
- devices.append(path)
- if len(devices) == 0:
- print("No devices available")
- sys.exit()
- i = 1
- for dev in devices:
- print("%d. %s" % (i, dev))
- i = i + 1
- print("Select a device: ",)
- select = None
- while select == None:
- try:
- pos = int(sys.stdin.readline()) - 1
- if pos < 0:
- raise TypeError
- select = devices[pos]
- except (TypeError, IndexError, ValueError):
- print("Wrong selection, try again: ",)
- except KeyboardInterrupt:
- sys.exit()
- device = dbus.Interface(bus.get_object(BUS_NAME, select),
- HEALTH_DEVICE_INTERFACE)
- echo = None
- while echo == None:
- try:
- print("Perform an echo (y/n)? ",)
- sel = sys.stdin.readline()
- if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
- echo = True
- elif sel in ("n\n", "no\n", "N\n", "NO\n"):
- echo = False
- else:
- print("Wrong selection, try again.")
- except KeyboardInterrupt:
- sys.exit()
- if echo:
- if device.Echo():
- print("Echo was ok")
- else:
- print("Echo war wrong, exiting")
- sys.exit()
- print("Connecting to device %s" % (select))
- if role == "source":
- chan = device.CreateChannel(app_path, "reliable")
- else:
- chan = device.CreateChannel(app_path, "any")
- print(chan)
- enter_mainloop()
- hdp_manager.DestroyApplication(app_path)
|