最为简单的情况下,除了了解清reactor的简单使用,你还要了解Protocol和Factory。它们最终都会由reactor的侦听建立和run来统一调度起来。
建立服务器的第一个要解决的问题就是服务与外界的交流协议。协议的定义在twisted中是通过继承twisted.internet.protocol.Protocol类来实现的。在协议中你可以定义连接、数据接收、断开连接等一系列的事件如果进行响应。但是对于所有连接上来的协议的建立、持久配置数据的存储这样的内容并不会保存在协议中。
持久配置数据的存储通常都会保存在工厂里。
工厂的定义在twisted中是通过继承twisted.internet.protocol.Factory类来实现的。twisted提供了缺省的工厂实现最为普通的需求。它会实例化每个协议,并且通过设置每个协议中factory属性来使协议可以使用到它自己,做这个设置的作用就是让协议在进行处理连接数据时可以使用到工厂中存储的持久配置数据。工厂的启动是需要reactor真实的建立侦听并启动才可以实现的。
reactor侦听的建立可以参考 twisted.internet.interfaces.IReactorTCP.listenTCP。这时我们先只说建立TCP服务器的侦听,如果你需要其它种类的侦听建立参考IReactor*.listen*系列API。
总结一下,我们书写一个twisted的Daemon,实质上会关注三个层次的对象。它们互相可配置,可独立开发,只需要通过简单的调用配置就可结合使用。第一层次就是侦听的建立、工厂的初始化、服务器的运行,它需要reactor的开发。第二个层次就是服务的初始化、用户连接的建立、持久配置数据的存储、协议的实例化,它需要factory的开发。第三个层次就是用户连接建立后的事件处理,这就需要protocol的开发了。
一、
protocol:内部实现的主要是连接在通信时的动作;
内部有transport,可进行write(), getpeer()(host,port)
还有factory
Factory:保存的是连接方的信息,当有链接过来时,factory会初始化一个protocol与对方建立连接,所以factory中有protocol. 因为protocol的方法经常会用到factory里的属性变量,所以protocol类中也有factory。这里就有一个循环引用的问题,Python中,有些类是有垃圾清理机制的,但是对于那些有定义方法__del__()的类,就会有内存泄露的问题。
Reactor:是管理twisted框架的核心。所有的事件都会触发reactor,然后他会开启服务,初始化factory,factory再初始化protocol。
Factory和protocol的基础实现在protocol.py文件中。
二、
Reactor:
是twisted框架中很重要的概念,事件驱动。他负责监测所有事件。
最常用的是run(), stop(), callLater()
callLater()实际是实例化了一个DelayedCall()类,并将实例的句柄返回。可进行cancel(), reset(), delay()等操作。
Reactor的实现,应该继承了internetase.py文件中的类ReactorBase(object)还有类_SignalReactorMixin,其中,_SignalReactorMixin类中有定义run()函数。
ReactorBase(object)类中,有方法stop()。好多reactor和thread的操作函数也能在这里面找到,上面的callLater()就是在这里实现的。
在internetposixbase.py文件中,有类PosixReactorBase(_SignalReactorMixin, ReactorBase),是上面说到的两个类的继承。在这里,实现了各种listen**函数,主要服务员server;还有对应的connect**函数,针对client。
<一>、例如:客户端可以使用:reactor. connectTCP(self, host, port, factory, timeout=30, bindAddress=None)
这是一个直接可以用的,所以reactor必定继承自PosixReactorBase。
在connectTCP()中,实现如下:
c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
c.connect()
return c
第一句:实例一个tcp.Connector类,该类继承自base. BaseConnector;
类base. BaseConnector实现的方法有:disconnect(), connect(),stopConnecting(self), cancelTimeout(self), buildProtocol(self, addr), connectionFailed(self, reason), connectionLost(self, reason)…
这些方法,和factory中的方法,函数名很相似,其实这些函数的内部就是调用相应的factory方法的。该类中有个很重要的变量就是self.factory。
第二句:是连接的发起函数,主要是开启了factory(connectTCP函数中传入的)。
Connect()定义如下:
def connect(self):
"""Start connection to remote server."""
if self.state != "disconnected":
raise RuntimeError, "can't connect in this state"
self.state = "connecting"
if not self.factoryStarted:
self.factory.doStart()
self.factoryStarted = 1
self.transport = transport = self._makeTransport()
if self.timeout is not None:
self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
self.factory.startedConnecting(self)
进过上面分析,reactor, factory, protocol就联系到一起了,很多操作也就清晰了。再回过去读最前面引用的内容,应该好理解多了。
<二>、对于服务端:reactor. listenTCP(self, port, factory, backlog=50, interface='')
具体实现如下:
p = tcp.Port(port, factory, backlog, interface, self)
p.startListening()
return p
第一句:实例一个tcp.Port类,该类继承了base.BasePort, _SocketCloser。
实现的方法有:
createInternetSocket(self),startListening(self),_buildAddr(self, (host, port)),
doRead(self),connectionLost(self, reason),getHost(self),doWrite(self)
loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE))
也有self.factory变量
第二句:做必要的准备,如:create and bind socket,侦听端口,读取数据等。
def startListening(self):
"""Create and bind my socket, and begin listening on it.
This is called on unserialization, and must be called after creating a
server to begin listening on the specified port.
"""
try:
skt = self.createInternetSocket()
skt.bind((self.interface, self.port))
except socket.error, le:
raise CannotListenError, (self.interface, self.port, le)
# Make sure that if we listened on port 0, we update that to
# reflect what the OS actually assigned us.
self._realPortNumber = skt.getsockname()[1]
log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
# The order of the next 6 lines is kind of bizarre. If no one
# can explain it, perhaps we should re-arrange them.
self.factory.doStart()
skt.listen(self.backlog)
self.connected = True
self.socket = skt
self.fileno = self.socket.fileno
self.numberAccepts = 100
self.startReading()
其中skt是socket.socket()返回的socket句柄。
三、继续深入:
上面讲到的是reactor.connectTCP(),这个方法会直接开始工作的(初始化factory,protocol等),也许我们需要自己手工控制这个过程。下面是利用类去实现:
applictioninternet.py文件中的,类TCPClient,当时为了找到这个类名,花了很多时间,这个类名是通过组合而成。
其原型是:class _AbstractClient(_VolatileDataService)
还有class _AbstractServer(_VolatileDataService)作为服务端的原型
一下几个类均是上面两个类的一个模式:
TCPServer, TCPClient,
UNIXServer, UNIXClient,
SSLServer, SSLClient,
UDPServer, UDPClient,
UNIXDatagramServer, UNIXDatagramClient,
MulticastServer
class _AbstractClient和class _AbstractServer最终都是从applictionserver.py中Server继承而来的。两个类都是服务,只是在实现过程稍有差别,一个针对connect,一个针对port。
它们均有:
self.reactor,是通过参数传递进去的。
Self.method,保存的是连接的方式,如tcp,udp
self._connection,保存连接的(其实是上面的listen**或connect**的返回值)
已经实例了对象,如何开始服务(工作)呢? startService(self)
1)、先来看看class _AbstractServer中的:
def startService(self):
service.Service.startService(self)
if self._port is None:
self._port = self._getPort()
def _getPort(self):
"""
Wrapper around the appropriate listen method of the reactor.
@return: the port object returned by the listen method.
@rtype: an object providing L{IListeningPort}.
"""
if self.reactor is None:
from twisted.internet import reactor
else:
eactor = self.reactor
return getattr(reactor, 'listen%s' % (self.method,))(
*self.args, **self.kwargs)
Getattr()这个函数不错,呵呵
最终调用的还是reactor中的linsten**,但是经过是先定义一个server,再通过server.startServer()开启的。
2)、再来看看class _AbstractClient
def startService(self):
service.Service.startService(self)
self._connection = self._getConnection()
def _getConnection(self):
"""
Wrapper around the appropriate connect method of the reactor.
@return: the port object returned by the connect method.
@rtype: an object providing L{IConnector}.
"""
if self.reactor is None:
from twisted.internet import reactor
else:
reactor = self.reactor
return getattr(reactor, 'connect%s' % (self.method,))(
*self.args, **self.kwargs)
参照上面的,很清晰了。