Data types
Unlike Python, D-Bus is statically typed - each method has a certain signature representing the types of its arguments, and will not accept arguments of other types.
具体内容详见官网教程,此处不再赘述。
Connecting to the Bus
import dbus session_bus = dbus.SessionBus() system_bus = dbus.SystemBus()
Making method calls (同步调用)
- The bus name.
This identifies which application you want to communicate with. - The object path.
To identify which one you want to interact with, you use an object path.
Proxy objects
proxy = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager/Devices/0')
Interfaces and methods
props = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') # 这里proxy作为代理对象使用,直接调用具体方法:Introspect(null) # props is a tuple of properties, the first of which is the object path
As a short cut, if you’re going to be calling many methods with the same interface, you can construct a dbus.Interface object and call methods on that, without needing to specify the interface again:
# 通过Interface对象间接调用 eth0_dev_iface = dbus.Interface(proxy, dbus_interface='org.freedesktop.DBus.Introspectable') props2 = eth0_dev_iface.Introspect() # props2 is the same as before
你可以直接 print(props) 打印获得的反馈。但如果通过type()查询props,你会得到 <class 'dbus.String'>,而不是标准的 python::str 。请参考文章开头的Data Types 章节的陈述。
Making asynchronous calls(异步调用)
Setting up an event loop
To make asynchronous calls, you first need an event loop or “main loop”.
from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) # You must do this before connecting to the bus. # 或者直接将DBus的 mainloop 附加在 pygi 主循环中 from gi.repository import GLib loop = GLib.MainLoop() loop.run() # 或者将主循环的调用通过参数传入每个connection import dbus from dbus.mainloop.glib import DBusGMainLoop dbus_loop = DBusGMainLoop() bus = dbus.SessionBus(mainloop=dbus_loop)
PyQt v4.2 and later includes support for integrating dbus-python with the Qt event loop. To connect D-Bus to this main loop, call dbus.mainloop.qt.DBusQtMainLoop instead of dbus.mainloop.glib.DBusGMainLoop. Otherwise the Qt loop is used in exactly the same way as the GLib loop.
Asynchronous method calls
- the reply_handler will be called with the method's return values as arguments; or
- the error_handler will be called with one argument, an instance of DBusException representing a remote exception.
# To make an async call, use the reply_handler and error_handler kwargs remote_object.HelloWorld("Hello from example-async-client.py!", dbus_interface='com.example.SampleInterface', reply_handler=handle_hello_reply, error_handler=handle_hello_error) # Interface objects also support async calls iface = dbus.Interface(remote_object, 'com.example.SampleInterface') iface.RaiseException(reply_handler=handle_raise_reply, error_handler=handle_raise_error)
实现 reply_handler 和 error_handler 方法:
def handle_hello_reply(r_args): print str(r_args) pass def handle_hello_error(err_args): print "HelloWorld raised an exception! That's not meant to happen..." print " ", str(err_args) if True: # ... loop.quit()
Receiving signals
Signal matching
- a callable (the handler_function) which will be called by the event loop when the signal is received
- the signal name, signal_name
- the D-Bus interface, dbus_interface
- a sender bus name (well-known or unique), bus_name
- a sender object path, path
add_signal_receiver( ) returns a SignalMatch object. Its only useful public API at the moment is a remove( ) method with no arguments, which removes the signal match from the connection.
def catchall_signal_handler(*args, **kwargs): print ("Caught signal (in catchall handler) " + kwargs['dbus_interface'] + "." + kwargs['member']) for arg in args: print " " + str(arg) def catchall_hello_signals_handler(hello_string): print "Received a hello signal and it says " + hello_string def catchall_testservice_interface_handler(hello_string, dbus_message): print "com.example.TestService interface says " + hello_string + " when it sent signal " + dbus_message.get_member() # catch the signal bus = dbus.SessionBus() bus.add_signal_receiver(catchall_signal_handler, interface_keyword='dbus_interface', member_keyword='member') bus.add_signal_receiver(catchall_hello_signals_handler, dbus_interface="com.example.TestService", signal_name="HelloSignal") bus.add_signal_receiver(catchall_testservice_interface_handler, dbus_interface="com.example.TestService", message_keyword='dbus_message')
Receiving signals from a proxy object
- the name of the signal
- a callable (the handler function) which will be called by the event loop when the signal is received
- the handler function
- the keyword argument dbus_interface qualifies the name with its interface
Getting more information from a signal
You can also arrange for more information to be passed to the handler function. If you pass the keyword arguments sender_keyword, destination_keyword, interface_keyword, member_keyword or path_keyword to the connect_to_signal( ) method, the appropriate part of the signal message will be passed to the handler function as a keyword argument: for instance if you use:
def handler(sender=None): print "got signal from %r" % sender iface.connect_to_signal("Hello", handler, sender_keyword='sender')
and a signal Hello with no arguments is received from "com.example.Foo", the handler( ) function will be called with sender='com.example.Foo'.
String argument matching
The handler will only be called if that argument (argN) of the signal (numbered from zero) is a D-Bus string (in particular, not an object-path or a signature) with that value.
def hello_signal_handler(hello_string): print ("Received signal (by connecting using remote object) and it says: " + hello_string) proxy = bus.get_object("com.example.TestService","/com/example/TestService/object") proxy.connect_to_signal("HelloSignal", hello_signal_handler, dbus_interface="com.example.TestService", arg0="Hello") # String argument matching
Exporting objects
Inheriting from dbus.service.Object
Object expects either a BusName or a Bus object, and an object-path, to be passed to its constructor: arrange for this information to be available. For example:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path)
This object will automatically support introspection, but won’t do anything particularly interesting. To fix that, you’ll need to export some methods and signals too.
Exporting methods with dbus.service.method
To export a method, use the decorator dbus.service.method. For example:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path) @dbus.service.method(dbus_interface='com.example.Sample', in_signature='v', out_signature='s') def StringifyVariant(self, variant): return str(variant)
The in_signature and out_signature are D-Bus signature strings as described in Data Types.
Finding out the caller’s bus name
The method decorator accepts a sender_keyword keyword argument. If you set that to a string, the unique bus name of the sender will be passed to the decorated method as a keyword argument of that name:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path) @dbus.service.method(dbus_interface='com.example.Sample', in_signature='', out_signature='s', sender_keyword='sender') def SayHello(self, sender=None): return 'Hello, %s!' % sender # -> something like 'Hello, :1.1!'
Asynchronous method implementations
class SomeObject(dbus.service.Object): @dbus.service.method("com.example.SampleInterface", in_signature='s', out_signature='as') def HelloWorld(self, hello_message): print (str(hello_message)) return ["Hello", " from example-service.py", "with unique name", session_bus.get_unique_name()]
Emitting signals with dbus.service.signal
To export a signal, use the decorator dbus.service.signal
; to emit that signal, call the decorated method. The decorated method can also contain code which will be run when called, as usual. For example:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path) @dbus.service.signal(dbus_interface='com.example.Sample', signature='us') def NumberOfBottlesChanged(self, number, contents): print "%d bottles of %s on the wall" % (number, contents) e = Example('/bottle-counter') e.NumberOfBottlesChanged(100, 'beer') # -> emits com.example.Sample.NumberOfBottlesChanged(100, 'beer') # and prints "100 bottles of beer on the wall"
The signal will be queued for sending when the decorated method returns - you can prevent the signal from being sent by raising an exception from the decorated method (for instance, if the parameters are inappropriate). The signal will only actually be sent when the event loop next runs.
思考
- 通过Proxy调用服务难道不是一个标准模式吗?为什么官方教程在 Receiving signals from a proxy object 一节并不推荐“仅仅为了监听信号而创建代理”?多余的开销是什么?
答:官方解释:当创建一个对象代理时,如果代理对象中存在服务方法,那么该方法在代理创建时也将被一并激活。那么就只剩下 bus.add_signal_receiver( ) 方法可以使用了——这恰恰说明,该方法的实现并没有创建一个代理,而仅仅是在监听bus的服务而已~