zoukankan      html  css  js  c++  java
  • Twisted 介绍 及TCP广播系统实例

    twisted 提供更多传输层 udp,tcp,tls及应用层HTTP,FTP等协议的支持,在开发方法上更提供了丰富的特性来支持异步编程

    安装twisted

    建议使用anaconda 安装,conda install twisted,

    Windows 安装需要先安装两个支持库,zope inteeface和pywin32 然后pip install twisted,

    Linux系统 直接命令行 apt-get install python-twisted 或者yum install python-twisted 

    实战演练 - 开发TCP广播系统

    系统描述:广播系统接收任意客户端的链接请求,并将任意客户端发送给服务器的消息转发给所有其他客户端,本系统是一个基本的实时通信模型

    1、广播服务器

     twisted基于传输层TCP编程时,直接针对Twisted的Protocol,Factory等类进行编程,而无需操作socket里面的bind,send,receive等基本用语,并且定义他们的子类并重写connectionMode,dataReceived进行事件化的TCP变成风格

    • connectionMode() 当链接建立时由twisted框架调用,本函数主要作用是在系统中注册该链接,方便调用
    • dataReceived() 当收到客户端数据时由twisted框架调用
    • connectionLost()当链接断开时由框架调用,实际应用中用来清理链接占用的资源 

    本广播系统的Protocol代码为:

    from twisted.internet.protocol import Protocol
    
    clients = [] # 保存所有客户端连接
    
    
    class Spreader (Protocol):  # 继承Protocol,实现需要重载的方法
        def __init__(self, factory):
            self.factory = factory
    
        def connectionMade(self): # 对连接的客户端计数,并保存到clients列表中
            self.factory.numProtocols = self.factory.numProtocols + 1
            self.transport.write (("欢迎来到Spread site,你是第%d 个客户端用户!
    " % (self.factory.numProtocols)).encode ('utf8'))
            print ("新连接: %d" % self.factory.numProtocols)
            clients.append (self)
    
        def connectionLost(self, reason): # 执行反向操作
            self.factory.numProtocols = self.factory.numProtocols - 1
            clients.remove (self)
            print ("关闭连接:%d" % self.factory.numProtocols)
    
        def dataReceived(self, data):  # 循环当前clients列表中的所有客户端,将受到的数据,分发给自己之外的所有客户端
            if data == 'close':  # 如果受到客户端发来的数据‘close’ ,则主动关闭与该客户端的链接
                self.transport.lostConnection ()
                for client in clients:
                    if client != self:
                        client.transport.write (data)

    开发Factory子类

    Factory类起到了 对Protocol 类的管理作用,当有新的客户端链接时,框架调用Factory.buildProtocol(),可以创建Protoco子类的实例,代码如下

    from twisted.internet.protocol import Factory
    
    from twisted.internet.endpoints import TCP4ServerEndpoint
    from twisted.internet import reactor
    
    class SpreaderFactory(Factory):
        def __init__(self):
            self.numProtocols = 0
    
        def buildProtocol(self, addr):
    
            return Spreader(self)
    
    # 8007是本服务器的监听端口
    endpoint = TCP4ServerEndpoint(reactor,8007)
    endpoint.listen(SpreaderFactory())
    
    reactor.run() # 挂起运行

    2、广播客户端

    实现与服务器程序相匹配的TCP客户端程序

    from twisted.internet.protocol import Protocol, ClientFactory
    
    from twisted.internet import reactor
    import threading
    import fileinput
    import time
    import sys
    import datetime
    
    class Echo (Protocol):  # Protocol子类,此处进行通信逻辑开发
        def __init__(self):
            self.connected = False  # 在函数routine中使用该状态决定是否向服务器发送消息
    
        def connectionMade(self):
            self.connected = True
    
        def connectionLost(self, reason):
            self.connected = False
    
        def dataReceived(self, data):
            print (data.decode ('utf8'))
    
    class EchoClientFactory (ClientFactory):  # Factory子类,管理链接关系
        def __init__(self):
            self.protocol = None
    
        def startedConnecting(self, connector):
            print ("开始链接")
    
        def buildProtocol(self, addr):
            print ('已连接')
            self.protocol = Echo ()
            return self.protocol
    
        def clientConnectionLost(self, connector, reason):
            print ("链接丢失,原因是:", reason)
    
        def clientConnectionFailed(self, connector, reason):
            print ('链接失败,原因是:', reason)
    
    bStop = False
    
    def routine(factory):   # 每隔5秒向服务器发送消息
        while not bStop:
            if factory.protocol and factory.protocol.connected:
                factory.protocol.transport.write(('hello,i am xx %s' % (datetime.datetime.now())).encode('utf8'))
            time.sleep(5)
    
    host = '127.0.0.1'
    port = 8007
    factory = EchoClientFactory()
    reactor.connectTCP(host,port,factory)
    threading.Thread(target=routine,args=(factory,)).start()   # 启动县城运行routine()函数
    reactor.run()  # 挂起运行
    bStop = True # 通知routine线程退出

     结果:

    server:

    客户端:

  • 相关阅读:
    vue-fullcalendar插件
    iframe 父框架调用子框架的函数
    关于调试的一点感想
    hdfs 删除和新增节点
    hadoop yarn 实战错误汇总
    Ganglia 安装 No package 'ck' found
    storm on yarn(CDH5) 部署笔记
    spark on yarn 安装笔记
    storm on yarn安装时 提交到yarn失败 failed
    yarn storm spark
  • 原文地址:https://www.cnblogs.com/Erick-L/p/7084810.html
Copyright © 2011-2022 走看看