zoukankan      html  css  js  c++  java
  • [原]tornado源码分析系列(四)[buffer事件类IOStream]

    引言:在分析了部分IOLoop,了解了其工作原理后,就可以看看建立在IOLoop上层的IOStream。IOStream主要提供的功能就是异步的读写操作。

    IOStream提供的接口有几个:

    1.read_bytes(bytes,callback)

    这个接口就是在有固定的字节的数据到来的时候调用回调函数

    2.read_until(delimiter,callback)

    这个接口的作用是在读取到固定的字符序列结尾后调用回调函数 callback

    3.write(data)

    异步写,就是将数据拷贝到应用层的缓冲区,由IOLoop下层统一调度

    看看源码的Demo

            from tornado import ioloop
            from tornado import iostream
            import socket
    
    	#当connect()函数完成后,调用本函数
    	#本函数在非阻塞异步写 write 后
    	#异步读取以\r\n\r\n 结尾的数据,并调用回调 on_headers
            def send_request():
                stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
                stream.read_until("\r\n\r\n", on_headers)
    	#当读取到以\r\n\r\n结尾的数据的时候,触发 on_headers回调函数,
    	#并传入参数 data(次data数据以\r\n\r\n结尾) 到 on_headers中 
            def on_headers(data):
                headers = {}
                for line in data.split("\r\n"):
                   parts = line.split(":")
                   if len(parts) == 2:
                       headers[parts[0].strip()] = parts[1].strip()
            #读取完header后,根据读取到的length 触发 on_body 函数,整个过程都是异步的
                stream.read_bytes(int(headers["Content-Length"]), on_body)
    
            def on_body(data):
                print data
                stream.close()
                ioloop.IOLoop.instance().stop()
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
            #创建一个流的实例,绑定描述符 s
            stream = iostream.IOStream(s)
            #连接并放置相应的回调 send_request,这里后面的操作都是纯异步实现的
            #connect完成后触发 send_request的调用
            stream.connect(("friendfeed.com", 80), send_request)
            ioloop.IOLoop.instance().start()
    

    如代码所示,上述过程全部是异步的,这也是为什么tornado比其他同类python的开源框架快的原因

    IOStream类的源码注释:

    class IOStream(object):
        """A utility class to write to and read from a non-blocking socket.
    
        We support three methods: write(), read_until(), and read_bytes().
        All of the methods take callbacks (since writing and reading are
        non-blocking and asynchronous). read_until() reads the socket until
        a given delimiter, and read_bytes() reads until a specified number
        of bytes have been read from the socket.
    
        The socket parameter may either be connected or unconnected.  For
        server operations the socket is the result of calling socket.accept().
        For client operations the socket is created with socket.socket(),
        and may either be connected before passing it to the IOStream or
        connected with IOStream.connect.
    
        A very simple (and broken) HTTP client using this class:
    
    				
            from tornado import ioloop
            from tornado import iostream
            import socket
    
    				#当connect()函数完成后,调用本函数
    				#本函数在非阻塞异步写 write 后
    				#异步读取以\r\n\r\n 结尾的数据,并调用回调 on_headers
            def send_request():
                stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
                stream.read_until("\r\n\r\n", on_headers)
    				#当读取到以\r\n\r\n结尾的数据的时候,触发 on_headers回调函数,
    				#并传入参数 data(次data数据以\r\n\r\n结尾) 到 on_headers中 
            def on_headers(data):
                headers = {}
                for line in data.split("\r\n"):
                   parts = line.split(":")
                   if len(parts) == 2:
                       headers[parts[0].strip()] = parts[1].strip()
            #读取完header后,根据读取到的length 触发 on_body 函数,整个过程都是异步的
                stream.read_bytes(int(headers["Content-Length"]), on_body)
    
            def on_body(data):
                print data
                stream.close()
                ioloop.IOLoop.instance().stop()
    
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
            #创建一个流的实例,绑定描述符 s
            stream = iostream.IOStream(s)
            #连接并放置相应的回调 send_request,这里后面的操作都是纯异步实现的
            #connect完成后触发 send_request的调用
            stream.connect(("friendfeed.com", 80), send_request)
            ioloop.IOLoop.instance().start()
    
        """
        def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
                     read_chunk_size=4096):
            self.socket = socket
            self.socket.setblocking(False)
            #
            self.io_loop = io_loop or ioloop.IOLoop.instance()
            self.max_buffer_size = max_buffer_size
            self.read_chunk_size = read_chunk_size
            #collections 是一个内部类,提供一些抽象类操作
            self._read_buffer = collections.deque()
            self._write_buffer = collections.deque()
            
            self._write_buffer_frozen = False
            self._read_delimiter = None
            self._read_bytes = None
            #读写调用的回调函数
            self._read_callback = None
            self._write_callback = None
            self._close_callback = None
            self._connect_callback = None
            self._connecting = False
            self._state = self.io_loop.ERROR
            #注册次流buffer的 callback 到IOLoop 中
            #self._handle_events 就是此 fd 的回调
            #相应的事件类型就是 self._state
            #一般情况下这个状态是读和写都会回调统一的
            # self._handle_events()
            with stack_context.NullContext():
                self.io_loop.add_handler(
                    self.socket.fileno(), self._handle_events, self._state)
    
        def connect(self, address, callback=None):
            """Connects the socket to a remote address without blocking.
    
            May only be called if the socket passed to the constructor was
            not previously connected.  The address parameter is in the
            same format as for socket.connect, i.e. a (host, port) tuple.
            If callback is specified, it will be called when the
            connection is completed.
    
            Note that it is safe to call IOStream.write while the
            connection is pending, in which case the data will be written
            as soon as the connection is ready.  Calling IOStream read
            methods before the socket is connected works on some platforms
            but is non-portable.
            """
            self._connecting = True
            try:
                self.socket.connect(address)
            except socket.error, e:
                # In non-blocking mode connect() always raises an exception
                if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
                    raise
            self._connect_callback = stack_context.wrap(callback)
            self._add_io_state(self.io_loop.WRITE)
    
        def read_until(self, delimiter, callback):
            """Call callback when we read the given delimiter."""
            assert not self._read_callback, "Already reading"
            self._read_delimiter = delimiter
            self._read_callback = stack_context.wrap(callback)
            while True:
                # See if we've already got the data from a previous read
                if self._read_from_buffer():
                    return
                self._check_closed()
                if self._read_to_buffer() == 0:
                    break
            self._add_io_state(self.io_loop.READ)
    
        def read_bytes(self, num_bytes, callback):
            """Call callback when we read the given number of bytes."""
            assert not self._read_callback, "Already reading"
            if num_bytes == 0:
                callback("")
                return
            self._read_bytes = num_bytes
            self._read_callback = stack_context.wrap(callback)
            while True:
                if self._read_from_buffer():
                    return
                self._check_closed()
                if self._read_to_buffer() == 0:
                    break
            self._add_io_state(self.io_loop.READ)
    		#异步write 很简单,在IOStream层只需要将数据append 到本地buffer
    		#注意不是append 到socket的buffer
    		#设置回调函数
    		#在异步的写的过程中,遵循的原则就是能写多少就写多少
    		
        def write(self, data, callback=None):
            """Write the given data to this stream.
    
            If callback is given, we call it when all of the buffered write
            data has been successfully written to the stream. If there was
            previously buffered write data and an old write callback, that
            callback is simply overwritten with this new callback.
            """
            self._check_closed()
            #直接将数据append 到_wriet_buffer 中
            self._write_buffer.append(data)
            #设置状态为监听可写的事件的时候将数据发出去
            #下面的工作是由IOLoop层来完成的
            self._add_io_state(self.io_loop.WRITE)
            #注册回调函数上去
            self._write_callback = stack_context.wrap(callback)
    
        def set_close_callback(self, callback):
            """Call the given callback when the stream is closed."""
            self._close_callback = stack_context.wrap(callback)
    
        def close(self):
            """Close this stream."""
            if self.socket is not None:
                self.io_loop.remove_handler(self.socket.fileno())
                self.socket.close()
                self.socket = None
                if self._close_callback:
                    self._run_callback(self._close_callback)
    
        def reading(self):
            """Returns true if we are currently reading from the stream."""
            return self._read_callback is not None
    
        def writing(self):
            """Returns true if we are currently writing to the stream."""
            return bool(self._write_buffer)
    
        def closed(self):
            return self.socket is None
    
    		#_handle_events 作为上层IOLoop的add_handle()的参数,传递到IOLoop中检测事件发生
    		#后回调此函数,READ , WRIET , ERROR 事件都采用这个统一的回调
        def _handle_events(self, fd, events):
            if not self.socket:
                logging.warning("Got events for closed stream %d", fd)
                return
            #根据相应的状态(事件类型)来统一的调度不同的回调函数
            #一般情况下就是调用 read , write
            try:
            		#处理注册在此描述符上的读事件
                if events & self.io_loop.READ:
                    self._handle_read()
                if not self.socket:
                    return
                #epoll第一阶段调用回调的过程中,会将事件类型和fd统一的传入函数
                #见 IOLoop.py 中 start()函数
                #注意这里的 connecting 标志位
                #在服务端的话是不需要此标志位的
                #可读事件就是回调 _handle_wriet()
                if events & self.io_loop.WRITE:
                    if self._connecting:
                        self._handle_connect()
                    self._handle_write()
                if not self.socket:
                    return
                #出错处理
                if events & self.io_loop.ERROR:
                    self.close()
                    return
                state = self.io_loop.ERROR
                if self.reading():
                    state |= self.io_loop.READ
                if self.writing():
                    state |= self.io_loop.WRITE
                if state != self._state:
                    self._state = state
                    self.io_loop.update_handler(self.socket.fileno(), self._state)
            except:
                logging.error("Uncaught exception, closing connection.",
                              exc_info=True)
                self.close()
                raise
    
        def _run_callback(self, callback, *args, **kwargs):
            try:
                # Use a NullContext to ensure that all StackContexts are run
                # inside our blanket exception handler rather than outside.
                with stack_context.NullContext():
                    callback(*args, **kwargs)
            except:
                logging.error("Uncaught exception, closing connection.",
                              exc_info=True)
                # Close the socket on an uncaught exception from a user callback
                # (It would eventually get closed when the socket object is
                # gc'd, but we don't want to rely on gc happening before we
                # run out of file descriptors)
                self.close()
                # Re-raise the exception so that IOLoop.handle_callback_exception
                # can see it and log the error
                raise
    		#这个函数是在_handle_events中被调用的
    		#当描述符可读后,统一采取统一的方式读取
    		#此时的 fd 作为类的全局变量,所以在调用 _read_to_buffer()的时候不需要传递这个参数
    		#_read_to_buffer() 是类方法,将从 socket 中读取的数据读入 buffer 中
    		#其中_read_to_buffer() 会调用 _read_from_socket 然后将数据转储到 buffer中统一管理
        def _handle_read(self):
            while True:
                try:
                    # Read from the socket until we get EWOULDBLOCK or equivalent.
                    # SSL sockets do some internal buffering, and if the data is
                    # sitting in the SSL object's buffer select() and friends
                    # can't see it; the only way to find out if it's there is to
                    # try to read it.
                    #从socket读取数据到本地buffer
                    #此同步调用其实是因为在这里的上层函数 _handle_read是异步被回调的
                    #所以这里能够确定此时的 socket 是可读的
                    result = self._read_to_buffer()
                except Exception:
                    self.close()
                    return
                if result == 0:
                    break
                else:
                		#在处理完数据并将数据通过一系列处理后,调用_read_from_buffer()将数据从buffer
                		#中取出,然后回调 read_bytes()或者是 read_untils() 中的回调函数
                    if self._read_from_buffer():
                        return
    		#从socket 读取数据到 chunk
        def _read_from_socket(self):
            """Attempts to read from the socket.
    
            Returns the data read or None if there is nothing to read.
            May be overridden in subclasses.
            """
            try:
                chunk = self.socket.recv(self.read_chunk_size)
            except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return None
                else:
                    raise
            if not chunk:
                self.close()
                return None
            return chunk
    		#_read_to_buffer 将调用 _read_from_socket中得到的数据chunk 加入到buffer中,此buffer
    		#是接收buffer,即 _read_buffer
        def _read_to_buffer(self):
            """Reads from the socket and appends the result to the read buffer.
    
            Returns the number of bytes read.  Returns 0 if there is nothing
            to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
            error closes the socket and raises an exception.
            """
            try:
                chunk = self._read_from_socket()
            except socket.error, e:
                # ssl.SSLError is a subclass of socket.error
                logging.warning("Read error on %d: %s",
                                self.socket.fileno(), e)
                self.close()
                raise
            if chunk is None:
                return 0
            #将 chunk 数据append 到 _read_buffer
            self._read_buffer.append(chunk)
            if self._read_buffer_size() >= self.max_buffer_size:
                logging.error("Reached maximum read buffer size")
                self.close()
                raise IOError("Reached maximum read buffer size")
            return len(chunk)
    		
    		#_read_from_buffer 是根据两个判断标准来确定需要读取多少数据
    		# 一 是根据结尾符号比如说 "\r\n\r\n" 作为结尾
    		# 二 是根据需要读取的字符数为依据
        def _read_from_buffer(self):
            """Attempts to complete the currently-pending read from the buffer.
    
            Returns True if the read was completed.
            """
            #以需要读取的字符数为标准读取数据
            if self._read_bytes:
                if self._read_buffer_size() >= self._read_bytes:
                    num_bytes = self._read_bytes
                    callback = self._read_callback
                    self._read_callback = None
                    self._read_bytes = None
                    #调用回调函数
                    self._run_callback(callback, self._consume(num_bytes))
                    return True
            #判断字符结尾符号,以此作为读取的结束符
            elif self._read_delimiter:
                _merge_prefix(self._read_buffer, sys.maxint)
                loc = self._read_buffer[0].find(self._read_delimiter)
                if loc != -1:
                    callback = self._read_callback
                    delimiter_len = len(self._read_delimiter)
                    self._read_callback = None
                    self._read_delimiter = None
                    #同样的是调用注册在此buffer 上的描述符
                    self._run_callback(callback,
                                       self._consume(loc + delimiter_len))
                    return True
            return False
    
        def _handle_connect(self):
            if self._connect_callback is not None:
                callback = self._connect_callback
                self._connect_callback = None
                self._run_callback(callback)
            self._connecting = False
    		#_handle_write 也是作为 _handle_events里面处理可写事件的函数
        def _handle_write(self):
            while self._write_buffer:
                try:
                    if not self._write_buffer_frozen:
                        # On windows, socket.send blows up if given a
                        # write buffer that's too large, instead of just
                        # returning the number of bytes it was able to
                        # process.  Therefore we must not call socket.send
                        # with more than 128KB at a time.
                        _merge_prefix(self._write_buffer, 128 * 1024)
                    #将buffer链的数据一并发出去
                    num_bytes = self.socket.send(self._write_buffer[0])
                    self._write_buffer_frozen = False
                    _merge_prefix(self._write_buffer, num_bytes)
                    self._write_buffer.popleft()
                except socket.error, e:
                    if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                        # With OpenSSL, after send returns EWOULDBLOCK,
                        # the very same string object must be used on the
                        # next call to send.  Therefore we suppress
                        # merging the write buffer after an EWOULDBLOCK.
                        # A cleaner solution would be to set
                        # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
                        # not yet accessible from python
                        # (http://bugs.python.org/issue8240)
                        #这里的套接口写过程当中如果遇到了不可写的情况,会先停止
                        #但是此时未发送的buffer 的数据任然在,所以不会导致数据丢失
                        #因为在发送的时候是有多少发送多少,因为在负责不大的情况下,套接口
                        #是一直可写的,所以 epoll返回的都是可写状态
                        self._write_buffer_frozen = True
                        break
                    else:
                        logging.warning("Write error on %d: %s",
                                        self.socket.fileno(), e)
                        self.close()
                        return
            #如果有 wriet_callback 则调用相应的回调函数
            if not self._write_buffer and self._write_callback:
                callback = self._write_callback
                self._write_callback = None
                self._run_callback(callback)
    
        def _consume(self, loc):
            _merge_prefix(self._read_buffer, loc)
            return self._read_buffer.popleft()
    
        def _check_closed(self):
            if not self.socket:
                raise IOError("Stream is closed")
    
        def _add_io_state(self, state):
            if self.socket is None:
                # connection has been closed, so there can be no future events
                return
            if not self._state & state:
                self._state = self._state | state
                self.io_loop.update_handler(self.socket.fileno(), self._state)
    
        def _read_buffer_size(self):
            return sum(len(chunk) for chunk in self._read_buffer)
    

    调用过程如下所示  

      

    结论:由于时间原因,没有将整个过程叙述的很明白,请谅解。

    文章属原创,转载请注明出处 联系作者: Email:zhangbolinux@sina.com QQ:513364476
  • 相关阅读:
    element 步骤条steps 点击事件
    element-ui的rules中正则表达式
    从master分支创建自己的分支
    2.1 系统调用io实现原理
    2-3形参和实参
    2-2函数
    2-1.编译和链接
    linux高编信号-------setitimer()、getitimer()
    linux高编IO-------有限状态机编程原理(mycpy)
    linux高编线程-------线程同步-条件变量
  • 原文地址:https://www.cnblogs.com/Bozh/p/2600520.html
Copyright © 2011-2022 走看看