zoukankan      html  css  js  c++  java
  • 采用asyncore进行实时同步

    最近在维护项目的时候,发现某个实时数据同步功能非常容易失败,故静下心来彻底弄清楚该设计的实现原理,以及其中用到的python异步sockethandler : asyncore。

    实时数据同步功能的设计非常简单,用户在网页上触发某个记录的"Sync Up" button, 后台把该记录的id和type传入asyncore Client,asyncore Client把信息传入local的另一个asyncore server进程,asyncore server端调用相应的同步数据API,进行同步,并把同步的数据信息(包括同步过程的log和status)写入数据库;同时在click button的时候,会触发一个循环的js call,不断发出异步请求,获取存入的实时同步log信息,把log信息打印到页面上,更新progress bar。

     

    asyncore Client:

    class Client(asyncore.dispatcher_with_send):
        def __init__(self, host, port, message):
            asyncore.dispatcher.__init__(self)
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            self.connect((host, port))
            self.out_buffer = message
            self.response   = ''
    
        def handle_close(self):
            self.close()
    
        def handle_read(self):
            self.response = self.recv(1024)
            self.close()
    
    def send(command):
       c = Client('127.0.0.1', 8081, command)
       asyncore.loop()
       return c.response
    
    if __name__ == '__main__':
       send('quit')

    asyncore server

    class asyncoreServer(asyncore.dispatcher):
     
        def __init__(self, address, handlerClass=ServerHandler):
            print ">>> Server Socket init with address:%s:%s" % (address[0], address[1])        
            self.address            = address
            self.handlerClass       = handlerClass
            self.address_family = socket.AF_INET
            self.socket_type = socket.SOCK_STREAM
            self.request_queue_size = 5
            self.allow_reuse_address = True
            
     
            asyncore.dispatcher.__init__(self)
            self.create_socket(self.address_family,self.socket_type)
     
            if self.allow_reuse_address:
                self.set_reuse_addr()
     
            self.server_bind()
            self.server_activate()
     
        def server_bind(self):
            self.bind(self.address)
            log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))
            print ">>> Begin bind server"
     
        def server_activate(self):
            self.listen(self.request_queue_size)
            log.debug("listen: backlog=%d" % self.request_queue_size)
     
        def fileno(self):
            return self.socket.fileno()
     
        def serve_forever(self):
            asyncore.loop()
     
        def handle_accept(self):
            (conn_sock, client_address) = self.accept()
            print "Receive Client request socket=%s, client_address=%s:%s" % (conn_sock, client_address[0], client_address[1])
            if self.verify_request(conn_sock, client_address):
               self.process_request(conn_sock, client_address)
     
        def verify_request(self, conn_sock, client_address):
            return True
     
        def process_request(self, conn_sock, client_address):
            log.info("conn_made: client_address=%s:%s" % (client_address[0],client_address[1]))
            self.handlerClass(conn_sock, client_address, self)
     
        def handle_close(self):
            self.close()

    关于asyncore的介绍:

    asyncore库是python的一个标准库,它是一个异步socket的包装。

    asyncore提供一个方法和一个基类:loop()方法和dispatcher基类。

    每一个继承dispatcher的类的对象,都可以看做一个需要处理的Socket,可以是TCP也可以是UDP,子类override一些dispatcher的方法,主要是重写‘handle_’打头的方法,比如:

    handle_connect, handle_close,  handle_read, handle_write ...

    loop()方法负责检查一个dict, dict中保存dispatcher的实例,这个字典被称为channel。每次创建一个dispatcher对象,都会把自己加入到一个默认的dict里面去(当然也可以自己指定channel)。当对象被加入到channel中的时候,socket的行为都已经被定义好,程序只需要调用loop(),一切功能就实现了。

    reference:

    https://docs.python.org/2/library/asyncore.html

  • 相关阅读:
    python操作redis
    python正则表达式-案例
    hive序列化和反序列化serde
    python配置文件
    Java写入的常用技巧
    Java从string数组创建临时文件
    Java官方操纵byte数组的方式
    python实例方法、静态方法和类方法
    ast.literal_eval(转)
    impala学习笔记
  • 原文地址:https://www.cnblogs.com/zhq1007/p/4396206.html
Copyright © 2011-2022 走看看