zoukankan      html  css  js  c++  java
  • Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象

    主要工作:
    服务器启动的时候做的事:
    1、把包含了各种配置信息的 application 对象封装到了 HttpServer 对象的 request_callback 字段中,等待被调用
    2、TCPServer 通过 listen 方法启动端口监听, 封装_handle_connection回调函数,并注册到 IOLoop 中
     
    服务器运行时做的事:
    3、当有请求到来时,注册在 IOLoop 中的 _handle_connection 将会被调用, _handle_connection 方法将会调用handle_stream 方法。
    4、handle_stream 方法是由 HTTPServer 重写TCPServer的方法,它将会创建 HTTP1ServerConnection对象和_ServerRequestAdapter 对象(这里只是创建好,并没有执行),并调用 HTTP1ServerConnection 对象的start_serving 方法
    5、start_serving 方法创建创建HTTP1Connection 对象,并在方法 _server_request_loop 中异步yield conn.read_response(request_delegate) 接受请求发过来的数据, 这里传入的delegate就是在HTTPServer 中创建的_ServerRequestAdapter对象。
    4、在异步接收的时候,_ServerRequestAdapter 负责将数据封装成 HTTPRequest 对象, 接收完毕之后,调用_ServerRequestAdapter的 finish 方法
    5、在调用_ServerRequestAdapter 的 finish 方法时,数据就会调用 application 对象的 __call__ 方法, 这时就回到了 Application 类了(这里不解释,直接看Application 的 __call__ 方法)
     
     
    1、TCPServer 作为工厂类,自身只做统一的端口绑定、监听、回调函数绑定的操作
    2、HTTPServer 作为子类,实现数据接收类的创建,接受数据,最后封装成 HTTPRequest 对象,交给Application 对象
    3、在整个接收数据的过程中,并不能分辨出 url 是什么,该匹配哪个 handler, 这件事是由 Application 对象来处理的
     
    源码分析,省略部分源码,只粘贴出主要部分
     1 class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate):
     2 #继承自TCPServer 和 httputil.HTTPServerConnectionDelegate
     3 
     4 def __init__(self, request_callback, no_keep_alive=False, io_loop=None,
     5              xheaders=False, ssl_options=None, protocol=None,
     6              decompress_request=False,
     7              chunk_size=None, max_header_size=None,
     8              idle_connection_timeout=None, body_timeout=None,
     9              max_body_size=None, max_buffer_size=None):
    10     self.request_callback = request_callback   #获取 application对象
    11     self.no_keep_alive = no_keep_alive
    12     self.xheaders = xheaders
    13     self.protocol = protocol
    14     self.conn_params = HTTP1ConnectionParameters(   #获取基本的http 参数
    15         decompress=decompress_request,
    16         chunk_size=chunk_size,
    17         max_header_size=max_header_size,
    18         header_timeout=idle_connection_timeout or 3600,
    19         max_body_size=max_body_size,
    20         body_timeout=body_timeout)
    21     TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,   传递HTTPServer 给TCPServer对象,初始化
    22                        max_buffer_size=max_buffer_size,
    23                        read_chunk_size=chunk_size)
    24     self._connections = set()
    25 
    26 def handle_stream(self, stream, address):  
    27  #请求到来时,真正的数据处理方法
    28     context = _HTTPRequestContext(stream, address,
    29                                   self.protocol)
    30     conn = HTTP1ServerConnection(   #创建HTTP1ServerConnection对象,用来获取链接
    31         stream, self.conn_params, context)
    32     self._connections.add(conn)
    33     conn.start_serving(self)   #启动获取数据服务, 传递自己为代理
    34 
    35 def start_request(self, server_conn, request_conn): #将会在handle_stream添加进去的代理中执行
    36     return _ServerRequestAdapter(self, request_conn)   #创建_ServerRequestAdapter对象, 开始接受封装数据
    HTTPServer
     1 class _ServerRequestAdapter(httputil.HTTPMessageDelegate):
     2     """负责数据流整理,将数据最后封装成 HTTPRequest对象
     3     """
     4     def __init__(self, server, connection):
     5         self.server = server
     6         self.connection = connection
     7         self.request = None
     8         if isinstance(server.request_callback, httputil.HTTPServerConnectionDelegate):
     9             self.delegate = server.request_callback.start_request(connection)
    10             self._chunks = None
    11         else:
    12             self.delegate = None
    13             self._chunks = []
    14 
    15     def headers_received(self, start_line, headers):   #接受请求头部数据
    16         if self.server.xheaders:
    17             self.connection.context._apply_xheaders(headers)
    18         if self.delegate is None:
    19             self.request = httputil.HTTPServerRequest(  #创建HTTPServerRequest对象
    20                 connection=self.connection, start_line=start_line,
    21                 headers=headers)
    22         else:
    23             return self.delegate.headers_received(start_line, headers)
    24 
    25     def data_received(self, chunk):  #不停的接受数据,放到_chunks里面
    26         if self.delegate is None:
    27             self._chunks.append(chunk)
    28         else:
    29             return self.delegate.data_received(chunk)
    30 
    31     def finish(self):  #request_callback本身就是一个application对象,在加上括号,那就是执行这个对象,也是就是调用application 的 __call__方法,并传入了封装好的request对象
    32         if self.delegate is None:
    33             self.request.body = b''.join(self._chunks)
    34             self.request._parse_body()
    35             self.server.request_callback(self.request)
    36         else:
    37             self.delegate.finish()
    38         self._cleanup()
    _ServerRequestAdapter
     1 TCPServer 启动一个 Socket 服务端口监听有三中方式
     2 1. `listen`: 单进程启动一个监听
     3         server = TCPServer()
     4         server.listen(8888)
     5         IOLoop.instance().start()
     6 
     7 2. `bind`/`start`: 多进程启动一个监听,取决于start(1)的进程数 
     8         server = TCPServer()
     9         server.bind(8888)
    10         server.start(0)  # Forks multiple sub-processes
    11         IOLoop.instance().start()
    12 
    13 3. `add_sockets`: 另外一种方式启动多进程
    14         sockets = bind_sockets(8888)
    15         tornado.process.fork_processes(0)
    16         server = TCPServer()
    17         server.add_sockets(sockets)
    18         IOLoop.instance().start()
    19 
    20 class TCPServer(object):
    21 
    22 def listen(self, port, address=""):   #单进程启动
    23     sockets = bind_sockets(port, address=address)    #绑定 socket 监听端口
    24     self.add_sockets(sockets)      #添加监听
    25 
    26 def add_sockets(self, sockets):
    27     if self.io_loop is None:
    28         self.io_loop = IOLoop.current()  #获取 IOLoop 单例
    29     for sock in sockets:
    30         self._sockets[sock.fileno()] = sock
    31  #将封装好的 self._handle_connection 回调函数与sock对象一起绑定到IOLoop 中, 这一步很重要,_handle_connection 是封装之后的回调函数,在 IOLoop 中会回调它
    32         add_accept_handler(sock, self._handle_connection,   io_loop=self.io_loop)  #add_accept_handler方法封装在netutil 文件中
    33 
    34 def _handle_connection(self, connection, address):
    35  #创建IOStream对象,并封装回调函数, 这个方法并不会在这里执行,只是在这里封装好之后,被当做参数传递与IOLoop 绑定 .......
    36     try:
    37         if self.ssl_options is not None:   #https 相关
    38             stream = SSLIOStream(connection, io_loop=self.io_loop,
    39                                  max_buffer_size=self.max_buffer_size,
    40                                  read_chunk_size=self.read_chunk_size)
    41         else:
    42             stream = IOStream(connection, io_loop=self.io_loop,    #创建一个 IOStream, 主要用来读取HTTP 数据流
    43                               max_buffer_size=self.max_buffer_size,
    44                               read_chunk_size=self.read_chunk_size)
    45  #调用继承至 TCPServer的类的handle_stream方法(也就是HTTPServer 的handle_stream 方法),传入IOStream和address, 
    46         self.handle_stream(stream, address)  
    47     except Exception:
    48         app_log.error("Error in connection callback", exc_info=True)
    49 
    50 #*******下面两个方法是用于多进程的方式启动 TCPServer 可以不看********
    51 def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128):
    52         #绑定socket
    53     sockets = bind_sockets(port, address=address, family=family,
    54                            backlog=backlog)
    55     if self._started:
    56         self.add_sockets(sockets)
    57     else:
    58         self._pending_sockets.extend(sockets)
    59 
    60 def start(self, num_processes=1):
    61 #多进程方式启动TCPServer,也就是一个 tornado 运行时,启动多个进程,同时监听一个端口,一般不推荐这么使用,而是采用 Supervisor 启动多个tornado进程,分别监听不同的端口
    62     assert not self._started
    63     self._started = True
    64     if num_processes != 1:
    65         process.fork_processes(num_processes)   #fork 子进程
    66     sockets = self._pending_sockets
    67     self._pending_sockets = []
    68     self.add_sockets(sockets)
    TCPServer
     
  • 相关阅读:
    CountDownLatch, CyclicBarrier, Semaphore
    工具类中使用@Autowired失败问题
    可重入锁(递归锁)
    读写锁
    自旋锁
    加入BLOG
    控制字符串的超长部分用省略号表示
    java常见面试题总结
    maven打包不运行test脚本的命令
    DataGrip使用教程
  • 原文地址:https://www.cnblogs.com/hepingqingfeng/p/6655814.html
Copyright © 2011-2022 走看看