zoukankan      html  css  js  c++  java
  • tornado源码分析-iostream

    tornado源码分析-iostream
    1.iostream.py作用
    用来异步读写文件,socket通信

    2.使用示例

    import tornado.ioloop
    import tornado.iostream
    import socket
    
    def send_request():
        stream.write(b'GET / HTTP/1.1
    Host: www.sina.com.cn
    Connection: close
    
    ')
        stream.read_until(b"
    
    ", on_headers)
    
    def on_headers(data):
        headers = {}
        for line in data.split(b"
    "):
            parts = line.split(b":")
            if len(parts) == 2:
                headers[parts[0]] = parts[1]
        stream.read_bytes(int(headers[b"Content-Length"]), on_body)
    
    def on_body(data):
        print(data)
        with open('sina.html', 'wb') as f:
            f.write(data)
        stream.close()
        tornado.ioloop.IOLoop.current().stop()
    
    
    if __name__ == '__main__':
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        stream = tornado.iostream.IOStream(client_socket)
        stream.connect(("www.sina.com.cn", 80), send_request)
        tornado.ioloop.IOLoop.current().start()
    

    与传统socket通信的区别是,iostream对socket进行了包装,可以进行异步的读写。

    3.iostream主要功能

    class IOStream(BaseIOStream):
    	def read_until(self, delimiter, callback):
    		future = self._set_read_callback(callback)
            self._read_delimiter = delimiter
            self._read_max_bytes = max_bytes
            try:
                self._try_inline_read()
    		except:
    			...
    	def read_bytes(self, num_bytes, callback, streaming_callback=None)
    		...
    	def read_until_regex(self, regex, callback):
    		...
    	def read_until_close(self, callback, streaming_callback=None):
    		...
    	def write(self, data, callback=None):
    		...
    	
    	def _handle_events(self, fd, events):
    		...
        def _add_io_state(self, state):
    		...
            if self._state is None:
                self._state = ioloop.IOLoop.ERROR | state
                with stack_context.NullContext():
                    self.io_loop.add_handler(
                        self.fileno(), self._handle_events, self._state)
            elif not self._state & state:
                self._state = self._state | state
                self.io_loop.update_handler(self.fileno(), self._state)
    	def connect(self, address, callback=None, server_hostname=None):
    		...
    
        def _try_inline_read(self):
    		# See if we've already got the data from a previous read
            self._run_streaming_callback()
    	    pos = self._find_read_pos()
    	    if pos is not None:
    	        self._read_from_buffer(pos)
    	        return
    	    self._check_closed()
    	    try:
    	        pos = self._read_to_buffer_loop()
    	    except Exception:
    	       ...
    	    if pos is not None:
    	        self._read_from_buffer(pos)
    	        return
    	    if self.closed():
    	        self._maybe_run_close_callback()
    	    else:
    	        self._add_io_state(ioloop.IOLoop.READ)
       	
    	def _read_from_buffer(self, pos):
            self._read_bytes = self._read_delimiter = self._read_regex = None
            self._read_partial = False
            self._run_read_callback(pos, False)
    
        def _run_read_callback(self, size, streaming):
            if streaming:
                callback = self._streaming_callback
            else:
                callback = self._read_callback
                self._read_callback = self._streaming_callback = None
                if self._read_future is not None:
                    assert callback is None
                    future = self._read_future
                    self._read_future = None
                    future.set_result(self._consume(size))
            if callback is not None:
                assert (self._read_future is None) or streaming
                self._run_callback(callback, self._consume(size))
            else:
                self._maybe_add_error_listener()
    
    	def _consume(self, loc):
    		...
    

    read_until:读到delimiter结束
    read_bytes:读取num_bytes个字符后结束
    read_until_regex:读取到正则匹配到后结束
    read_until_close:读取到scoket关闭后结束
    读流程:
    self._run_streaming_callback()
    字节流回调,待研究

    future = self._set_read_callback(callback)
    注册读完成回调事件

    pos = self._find_read_pos()
    读取位置

    self._read_from_buffer(pos)
    从缓存读取

    pos = self._read_to_buffer_loop()
    socket等待接收数据,放入缓存
    self._read_from_buffer(pos)
    从缓存读取

    self._consume(size)
    读缓冲器size个字节,从缓冲区删除并返回这些数据

    _maybe_add_error_listener
    没有可读数据,开启监听read事件,当read事件发生时再调用handle_read处理

    _pending_callbacks
    控制信号量,待研究

    write:首先将data按数据块大小WRITE_BUFFER_CHUNK_SIZE分块写入write_buffer,
    然后调用handle_write向socket发送数据

    connect:建立非阻塞socket连接,注册ioloop可写事件

    _add_io_state:为ioloop注册或更新READ,WRITE,ERROR事件,ioloop异步执行读socket,写socket操作等
    handle_read:从socket读到缓冲区
    handle_write:由缓冲区向socket写

  • 相关阅读:
    如何改变Activity在当前任务堆栈中的顺序,Intent参数大全
    SQL删除重复记录,并只保留一条
    SpringCloud+Eureka+Feign+Ribbon+zuul的简化搭建流程和CRUD练习
    Spring Cloud Bus 消息总线
    Spring Cloud之Swagger集群搭建
    nginx-ZUUL集群
    spring boot swagger-ui.html 404
    jenkins 部署docker 容器 eureka 集群 完整配置 多台服务器
    Linux(Centos)之安装Nginx及注意事项
    Idea 导出Modules =>jar
  • 原文地址:https://www.cnblogs.com/shijingjing07/p/7922916.html
Copyright © 2011-2022 走看看