zoukankan      html  css  js  c++  java
  • Python twisted事件驱动网络框架 源码剖析

    一、Twisted简介

      Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等。

      事件驱动简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。

    Protocols

      Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:

      makeConnection    在transport对象和服务器之间建立一条连接
      connectionMade     连接建立起来后调用
      dataReceived       接收数据时调用
      connectionLost      关闭连接时调用

    Transports

      Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:

      write                   以非阻塞的方式按顺序依次将数据写到物理连接上
      writeSequence  将一个字符串列表写到物理连接上
      loseConnection   将所有挂起的数据写入,然后关闭连接
      getPeer       取得连接中对端的地址信息
      getHost      取得连接中本端的地址信息

      将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。

    二、源码分析

    EchoServer:

    from twisted.internet import protocol
    from twisted.internet import reactor        #reactor无限循环,写好了事件,reactor自动检测,类似于select
      
    class Echo(protocol.Protocol):
        def dataReceived(self, data):       #只要twisted一收到数据,就会调用dataRecevied方法
            self.transport.write(data)      #把收到的数据返回给客户端
      
    def main():
        factory = protocol.ServerFactory()      #定义一个基类,类似于socketserver的handler上面一层的类
        factory.protocol = Echo     #类似于socketserver中的handler,必须定义此Echo,表明每一个客户端过来后都会调用Echo建立一个实例
      
        reactor.listenTCP(1234,factory)     #reactor类似于select,是一个触发器,检测1234端口,需要把定义的基础类传进来
        reactor.run()       #reactor执行
      
    if __name__ == '__main__':
        main()

    EchoClient:

    from twisted.internet import reactor, protocol
      
      
    # a client protocol
      
    class EchoClient(protocol.Protocol):
        """Once connected, send a message, then print the result."""
      
        def connectionMade(self):       #只要连接一建立成功,就会自动调用此方法
            self.transport.write("hello!")      #给服务端发送hello 
      
        def dataReceived(self, data):       #当有数据收到时,就会调用这个方法,自动进行
            "As soon as any data is received, write it back."
            print "Server said:", data      #收到数据后打印数据
            self.transport.loseConnection()     #数据传送完毕后,关闭连接,执行了下面的方法  |
    #                                                                                     v
    #                       ---------<--------------<-----------------<----------------<---
    #                       |
    #                       v
        def connectionLost(self, reason):       #client connection断开了,会执行此方法,此为自己定义的connectionLost方法
            print "connection lost"
      
    class EchoFactory(protocol.ClientFactory):
        protocol = EchoClient       #在类中定义protocal,重写这个类;EchoClient相当于socketserver中的handle方法
      
        def clientConnectionFailed(self, connector, reason):        #如果reactor连接不上服务端,自动调用这方法
            print "Connection failed - goodbye!"        #打印连接失败信息
            reactor.stop()      #关闭连接
      
        def clientConnectionLost(self, connector, reason):      #如果client connection断开了,会自动调用此方法,类似于socketserver的handle后面的finish方法,和上面的connectionLost方法不同。
            print "Connection lost - goodbye!"      #打印连接断开信息
            reactor.stop()      #关闭连接
      
      
    # this connects the protocol to a server running on port 8000
    def main():
        f = EchoFactory()       #创建一个客户端的基类,与服务端的ServerFactory类似
        reactor.connectTCP("localhost", 1234, f)        #连接'localhost',端口号,把客户端的基类传入reactor
        reactor.run()       #运行reactor
      
    # this only runs if the module was *not* imported
    if __name__ == '__main__':
        main()      #程序入口,进入主程序

      运行服务器端脚本将启动一个TCP服务器,监听端口1234上的连接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本将对服务器发起一个TCP连接,回显服务器端的回应然后终止连接并停止reactor事件循环。这里的 Factory用来对连接的双方生成protocol对象实例。两端的通信是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理。

    三、RPC客户端向服务端发送命令源码分析:

    RPC server:

    #Project interpreter: 2.7
    import pika, os, time
    
    def operate(body):
        sys_result=os.popen(body).read()
        print("%s client execute 33[1;31;0m%s33[0m result:
    %s" % (time.strftime('%Y-%m-%d %H:%M:%S'),body,sys_result))
        return sys_result
    
    def on_request(ch, method, props, body):
        response = operate(body)
        ch.basic_publish(exchange='',   #basic_publish指向管道内发送数据
                         routing_key=props.reply_to,    #指定向哪个队列发数据
                         properties=pika.BasicProperties(correlation_id = props.correlation_id),
                         body=str(response))    #body是发送的消息内容
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    if __name__ == '__main__':
        try:
            connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))   #这是阻塞的连接
            channel = connection.channel()  #生成一个管道
            channel.queue_declare(queue='rpc_queue')    #在管道中创建一个队列,名字叫rpc_queue
            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(on_request, queue='rpc_queue')
            print("Server is waiting RPC requests...")
            channel.start_consuming()   #开始接收数据,阻塞状态
        except KeyboardInterrupt:
            print("Connection lost...")

    RPC client:

    #Project interpreter: 2.7
    import pika, uuid
    
    class OperateRpcClient(object):   #对类进行实例化
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将此queue删除
            self.callback_queue = result.method.queue   #服务端执行完结果返回的queue名字
            self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)    #no_ack不需要确认,如果为False,当客户端消费完成后,会给服务端发送确认消息;queue参数指定了收取消息队列的名称
    
        def on_response(self, ch, method, props, body): #回调方法
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.input = raw_input("root@client>> ")
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=self.input)
            while self.response is None:
                self.connection.process_data_events()   #不断的去Queue里面接收数据,而且不是阻塞的
            return self.response
    
    if __name__ == '__main__':
        print("This program use rabbitMQ send your OS command to server, your can use common command at here, enjoy it!
    	e.g.
    		1.ls
    		2.pwd
    		3.free -m
    		4.df -Th
    		5.netstat -anplute")
        while True:
            try:
                operate_rpc = OperateRpcClient()
                response = operate_rpc.call()
                print(response)
            except KeyboardInterrupt:
                print("Connection lost...")
  • 相关阅读:
    java实现向有序数组中插入一个元素
    java实现向有序数组中插入一个元素
    java实现向有序数组中插入一个元素
    java实现字符逆序输出
    java实现字符逆序输出
    java实现字符逆序输出
    将博客搬至CSDN
    sql server链接查询
    sql server模糊查询、分组
    sql server简单查询
  • 原文地址:https://www.cnblogs.com/stefan-liu/p/5307377.html
Copyright © 2011-2022 走看看