zoukankan      html  css  js  c++  java
  • python Asyncore.dispatcher 理解

    1、Asyncore是python的标准库。Asyncore.dispatcher 是这个库中的一个socket的框架,为socket添加了一些通用的回调方法,比如:
    def listen(self, num):
    
    
        def bind(self, addr):
    
    
        def connect(self, address):
    
    
        def accept(self):
    
    
        def send(self, data):
    
    
        def recv(self, buffer_size):
    
    
        def close(self):
    
      def handle_read(self):
            self.log_info('unhandled read event', 'warning')
    
        def handle_write(self):
            self.log_info('unhandled write event', 'warning')
    
        def handle_connect(self):
            self.log_info('unhandled connect event', 'warning')
    
        def handle_accept(self):
            self.log_info('unhandled accept event', 'warning')
    
        def handle_close(self):
            self.log_info('unhandled close event', 'warning')
            self.close()

    2、Asyncore.dispatcher 对socket进行了一次封装之后,这个socket就可以添加到一个全局的字典中,Asyncore模块中的loop函数会对这些socket文件描述符进行监视,具体可能是select或者poll方法实现,windows上面是poll实现的。通过I/O多路复用实现同时处理多个请求,避免使用多线程或者多进程的方式。下面中loop中传入的map就是socket-dispatcher的对象集合,当发现对应的socket有可读可写的请求时,调用对应的handle_read 和handle_write方法。

    def poll(timeout=0.0, map=None):
        if map is None:
            map = socket_map
        if map:
            r = []; w = []; e = []
            for fd, obj in map.items():
                is_r = obj.readable()
                is_w = obj.writable()
                if is_r:
                    r.append(fd)
                # accepting sockets should not be writable
                if is_w and not obj.accepting:
                    w.append(fd)
                if is_r or is_w:
                    e.append(fd)
            if [] == r == w == e:
                time.sleep(timeout)
                return
    
            try:
                r, w, e = select.select(r, w, e, timeout)
            except select.error, err:
                if err.args[0] != EINTR:
                    raise
                else:
                    return
    
            for fd in r:
                obj = map.get(fd)
                if obj is None:
                    continue
                read(obj)
    
            for fd in w:
                obj = map.get(fd)
                if obj is None:
                    continue
                write(obj)
    
            for fd in e:
                obj = map.get(fd)
                if obj is None:
                    continue
                _exception(obj)
    
    def poll2(timeout=0.0, map=None):
        # Use the poll() support added to the select module in Python 2.0
        if map is None:
            map = socket_map
        if timeout is not None:
            # timeout is in milliseconds
            timeout = int(timeout*1000)
        pollster = select.poll()
        if map:
            for fd, obj in map.items():
                flags = 0
                if obj.readable():
                    flags |= select.POLLIN | select.POLLPRI
                # accepting sockets should not be writable
                if obj.writable() and not obj.accepting:
                    flags |= select.POLLOUT
                if flags:
                    # Only check for exceptions if object was either readable
                    # or writable.
                    flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
                    pollster.register(fd, flags)
            try:
                r = pollster.poll(timeout)
            except select.error, err:
                if err.args[0] != EINTR:
                    raise
                r = []
            for fd, flags in r:
                obj = map.get(fd)
                if obj is None:
                    continue
                readwrite(obj, flags)
    
    poll3 = poll2                           # Alias for backward compatibility
    
    def loop(timeout=30.0, use_poll=False, map=None, count=None):
        if map is None:
            map = socket_map
    
        if use_poll and hasattr(select, 'poll'):
            poll_fun = poll2
        else:
            poll_fun = poll
    
        if count is None:
            while map:
                poll_fun(timeout, map)
    
        else:
            while map and count > 0:
                poll_fun(timeout, map)
                count = count – 1

     

    3、如何实现一个异步的服务器,官方给出了一个比较好的代码:

    import logging
    import asyncore
    import socket
     
    logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s")
    log                     = logging.getLogger(__name__)
     
    BACKLOG                 = 5
    SIZE                    = 1024
     
    class EchoHandler(asyncore.dispatcher):
     
        def __init__(self, conn_sock, client_address, server):
            self.server             = server
            self.client_address     = client_address
            self.buffer             = ""
     
            # We dont have anything to write, to start with
            self.is_writable        = False
     
            # Create ourselves, but with an already provided socket
            asyncore.dispatcher.__init__(self, conn_sock)
            log.debug("created handler; waiting for loop")
     
        def readable(self):
            return True     # We are always happy to read
     
        def writable(self):
            return self.is_writable # But we might not have
                                    # anything to send all the time
     
        def handle_read(self):
            log.debug("handle_read")
            data = self.recv(SIZE)
            log.debug("after recv")
            if data:
                log.debug("got data")
                self.buffer += data
                self.is_writable = True  # sth to send back now
            else:
                log.debug("got null data")
     
        def handle_write(self):
            log.debug("handle_write")
            if self.buffer:
                sent = self.send(self.buffer)
                log.debug("sent data")
                self.buffer = self.buffer[sent:]
            else:
                log.debug("nothing to send")
            if len(self.buffer) == 0:
                self.is_writable = False
     
        # Will this ever get called?  Does loop() call
        # handle_close() if we called close, to start with?
        def handle_close(self):
            log.debug("handle_close")
            log.info("conn_closed: client_address=%s:%s" % 
                         (self.client_address[0],
                          self.client_address[1]))
            self.close()
            #pass
     
    class EchoServer(asyncore.dispatcher):
     
        allow_reuse_address         = False
        request_queue_size          = 5
        address_family              = socket.AF_INET
        socket_type                 = socket.SOCK_STREAM
     
        def __init__(self, address, handlerClass=EchoHandler):
            self.address            = address
            self.handlerClass       = handlerClass
     
            asyncore.dispatcher.__init__(self)
            self.create_socket(self.address_family,
                                   self.socket_type)
     
            if self.allow_reuse_address:
                self.set_reuse_addr()
     
            self.server_bind()
            self.server_activate()
     
        def server_bind(self):
            self.bind(self.address)
            log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))
     
        def server_activate(self):
            self.listen(self.request_queue_size)
            log.debug("listen: backlog=%d" % self.request_queue_size)
     
        def fileno(self):
            return self.socket.fileno()
     
        def serve_forever(self):
            asyncore.loop()
     
        # TODO: try to implement handle_request()
     
        # Internal use
        def handle_accept(self):
            (conn_sock, client_address) = self.accept()
            if self.verify_request(conn_sock, client_address):
                self.process_request(conn_sock, client_address)
     
        def verify_request(self, conn_sock, client_address):
            return True
     
        def process_request(self, conn_sock, client_address):
            log.info("conn_made: client_address=%s:%s" % 
                         (client_address[0],
                          client_address[1]))
            self.handlerClass(conn_sock, client_address, self)
     
        def handle_close(self):
            self.close()

    使用示例:

    interface = "0.0.0.0"
    port = 8080
    server = asyncore_echo_server.EchoServer((interface, port))
    server.serve_forever()

    代码中创建了两个dispatcher的子类,一个是server专门用于接受客户端的请求;一个是EchoHandler,用于收到请求的时候处理客户端的每一个请求。

    3、Asyncore.dispatcher 子类中的server和普通的Handler有什么区别?

    区别只有一个地方,dispatcher初始化的时候,server会先创建一个socket,binding本地的地址和端口,然后listening,在listening的过程中会执行下列代码

    def listen(self, num):
            self.accepting = True
            if os.name == 'nt' and num > 5:
                num = 5
            return self.socket.listen(num)

    当self.accepting = True 之后,客户端所有的请求会当成accept处理,也就是收到一个请求,得到一个socket。如果这个时候不把这个socket传给handler,那么这个socket因为没有人保存会立即释放,导致socket关闭。

  • 相关阅读:
    mac Navicat连接Oracle报错ORA-21561: OID generation failed
    svn: E230001: Server SSL certificate verification failed: certificate issued
    mac删除系统应用出现mac Read-Only filesystem
    spring boot项目03:阅读启动源码
    spring boot项目02:Web项目(基础)
    spring boot项目01:非Web项目(基础)
    idea 单独引入jar_Iidea 单独引入jar_Intellij IDEA 添加jar包的三种方式ntellij IDEA 添加jar包的三种方式
    java输出pdf的依赖包,非maven,包名:spire.pdf.jar 下载
    IDEA Error:java: 无效的源发行版: 11错误
    SpringBoot官网以下载模板方式创建
  • 原文地址:https://www.cnblogs.com/doudouyoutang/p/4541369.html
Copyright © 2011-2022 走看看