zoukankan      html  css  js  c++  java
  • 深入tornado中的http1connection

    前言

      tornado中http1connection文件的作用极其重要,他实现了http1.x协议。

      本模块基于gen模块和iostream模块实现异步的处理请求或者响应。

      阅读本文需要一些基础的http知识。

    正文:

      http协议是建立在tcp基础上的应用层协议,tcp层由TCPServer,IOStream负责,对http报文的读取与解析则由http1connection负责。当http报文被解析完成再交由给某个delegate类实例负责进行后续处理。

    接下来说一下大体的过程(tornado作为服务器端):

      如果客户端要向服务端发送http请求,首先要建立tcp连接,

      tornado在连接建立完成后会将连接封装为一个IOStream对象,这个对象可以异步的从连接中读写数据

      tornado中又实现了HTTP1ServerConnection与HTTP1Connection两个类,他们依赖于底层的IOStream从套接字中读写,并共同合作完成了http1.x协议。

      HTTP1Connection实际主要是用来处理http事务(http权威指南:http事务是由一条请求以及对应该请求的响应组成),当然他自己实现了前半部分,也就是对报文起始行、首部、主体进行读,解析;后半部分需要配合HTTPMessageDelegate进行工作。

      HTTPMessageDelegate对经过HTTP1Connection解析后的报文进行分配,然后由其他类(比如说RequestHandler)执行具体的业务逻辑。

      其他类生成响应,并且把响应发送到IOStream中,这时就表示着这条http事务已经完成,我们需要根据情况判断是否关闭连接。

      HTTP1ServerConnection则是不断的生成HTTP1Connection实例,也就是不断的处理http事务,直到连接关闭。

    HTTP1Connection类:

      先看HTTP1Connection这个类。这个类实际上主要完成的工作可以概括为两个:

        1  读取并解析报文消息

        2  写入报文

    1 读取并解析报文

      当请求或者响应到来的时候,read_response是解析消息的入口,尽管该方法读起来好像仅仅是针对响应,但因为不管是请求或者是响应格式是相差不大的,所以不管是请求或者是响应他都是可以处理的。read_response中主要调用了_read_message方法,解析报文的逻辑也都是在_read_message方法中,另外,本文主要是对其作为服务端时进行分析

    先来说一下_read_mssage的大体逻辑:

      首先HTTP1Connection基于iostream读取请求报文,并对请求报文进行解析,分离出起始行 请求首部,并根据请求首部判定是否读取消息主体以及消息主体的长度。

      在这个过程中,分析起始行的信息然后委托给代理(HTTPMessageDelegate)获取对应的RequestHandler(这一步主要是_RequestDispatcher类实现的),并实例化,根据起始行的method调用相关方法,在调用方法执行业务逻辑时可能会用到模板语言,cookie,csrf等等其他东西,但最终会产生响应,并将响应发送到IOStream中。这些工作都是delegate干的。

      最后HTTP1Connection等待响应发送完成(这一步操作是异步的),根据是否支持keep-alive决定是否处理完后关闭连接

        def read_response(self, delegate):
            """Read a single HTTP response.
            """
            if self.params.decompress:
                delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
            return self._read_message(delegate)
    
        @gen.coroutine
        def _read_message(self, delegate):    # self是HTTP1Connection实例对象,delegate是_ServerRequestAdapter实例对象
            need_delegate_close = False
            try:
                header_future = self.stream.read_until_regex(b"
    ?
    
    ?
    ", max_bytes=self.params.max_header_size)
                # 两种方式来等待请求头(在服务器模式下是请求,客户端模式下是响应)的读,第一种是什么时候发送过来什么时候读,
                # 第二种是设置超时时间时长的定时任务,如果这段时间内没有发送过来那么就关闭连接
                if self.params.header_timeout is None: # 第一种
                    header_data = yield header_future  # 获取起始行以及头部信息的bytes流
                else:    # 第二种
                    try:
                        header_data = yield gen.with_timeout(
                            self.stream.io_loop.time() + self.params.header_timeout,
                            header_future,
                            io_loop=self.stream.io_loop,
                            quiet_exceptions=iostream.StreamClosedError)
                    except gen.TimeoutError:
                        self.close()
                        raise gen.Return(False)
                start_line, headers = self._parse_headers(header_data)    # 获取起始行以及头部信息
                if self.is_client:
                    start_line = httputil.parse_response_start_line(start_line)
                    self._response_start_line = start_line
                else:
                    start_line = httputil.parse_request_start_line(start_line)
                    self._request_start_line = start_line
                    self._request_headers = headers
    
                self._disconnect_on_finish = not self._can_keep_alive(start_line, headers) # 如果不是keep alive在响应结束后关闭连接
                need_delegate_close = True
                with _ExceptionLoggingContext(app_log):
                    # 这一步会做了很多东西,如果是服务器端,这一步会设置request对象,并根据请求中的url选择对应handler
                    header_future = delegate.headers_received(start_line, headers)
                    if header_future is not None:
                        yield header_future
                if self.stream is None:
                    # We've been detached.
                    need_delegate_close = False
                    raise gen.Return(False)
                skip_body = False
                if self.is_client:  # 作为client
                    if (self._request_start_line is not None and
                            self._request_start_line.method == 'HEAD'):  # 如果方法是HEAD,那么默认是没有主体的。即使有主体也会被忽略掉
                        skip_body = True
                    code = start_line.code
                    # 如果客户端发送了一个带条件的GET 请求且该请求已被允许,而文档的内容(自上次访问以来或者根据请求的条件)并没有改变,则服务器应当返回这个304状态码
                    if code == 304:  
                        # 304报文可能会包含content-length首部属性,但实际上是没有消息主体的
                        # http://tools.ietf.org/html/rfc7230#section-3.3
                        skip_body = True
                    if code >= 100 and code < 200: # 临时的响应。客户端在收到常规响应之前,应准备接收一个或多个1XX响应
                        # 1xx 报文是不能包含主体信息的
                        if ('Content-Length' in headers or
                                'Transfer-Encoding' in headers):
                            raise httputil.HTTPInputError(
                                "Response code %d cannot have body" % code)
                        # 我们所需要的真正的响应还没有接收到,所以继续接收
                        yield self._read_message(delegate)
                else:
                    # 1、Expect:100-continue 用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处理POST的数据,
                    # 如果不处理,客户端则不上传POST数据,如果处理,则POST上传数据。在现实应用中,通过在POST大数据时,才会使用100-continue协议。
                    # http://www.cnblogs.com/tekkaman/archive/2013/04/03/2997781.html
                    if (headers.get("Expect") == "100-continue" and not self._write_finished):
                        # 默认是支持的,所以收到请求后,返回100。
                        self.stream.write(b"HTTP/1.1 100 (Continue)
    
    ")
                if not skip_body:
    # 这一步读取消息主体 body_future
    = self._read_body(start_line.code if self.is_client else 0, headers, delegate) if body_future is not None: if self._body_timeout is None: yield body_future else: try: yield gen.with_timeout( self.stream.io_loop.time() + self._body_timeout, body_future, self.stream.io_loop, quiet_exceptions=iostream.StreamClosedError) except gen.TimeoutError: gen_log.info("Timeout reading body from %s", self.context) self.stream.close() raise gen.Return(False) self._read_finished = True if not self._write_finished or self.is_client: need_delegate_close = False with _ExceptionLoggingContext(app_log): # 如果是服务器端,这一步会生成对应的handler实例,然后执行业务逻辑,最后将响应写入IOStream中 delegate.finish() # If we're waiting for the application to produce an asynchronous # response, and we're not detached, register a close callback # on the stream (we didn't need one while we were reading) # 等待异步响应完成,所有数据都写入 fd,才继续后续处理,详细见 _finish_request/finish 方法实现。 # 当异步写完成,在HTTPServerRequest中调用当前对象的finish方法,finish方法则会调用_finisth_request方法,该方法内部会对_finish_future对象set_result if (not self._finish_future.done() and self.stream is not None and not self.stream.closed()): self.stream.set_close_callback(self._on_connection_close) yield self._finish_future # 判定是否关闭连接,服务器端一般等待客户端主动关闭,而如果是客户端则根据是否持久连接进行关闭 if self.is_client and self._disconnect_on_finish: self.close() if self.stream is None: raise gen.Return(False) except httputil.HTTPInputError as e: gen_log.info("Malformed HTTP message from %s: %s", self.context, e) self.close() raise gen.Return(False) finally: if need_delegate_close: with _ExceptionLoggingContext(app_log): delegate.on_connection_close() self._clear_callbacks() raise gen.Return(True)

    值得注意的是,该方法中读取报文主体的几种方式:
      1 假设没有开启keep-alive,那么我们将连接结束作为报文终止的标志

      2 而如果开启了keep-alive,那么我们根据Content-Length确定当前报文的终止位置

      3 如果开启了分块传输编码(Transfer-Encoding:chunked,这时候Content-Length就不起作用了,实际上在tornado中如果Content-Length以及分块传输编码都指定则会返回错误)那么就会根据分块传输编码的格式一直读取,直到读取到b"0 "时就可以确定当前报文已终止

    _read_body方法则根据情况选择读取报文主体的方式,以上三种选择分别对应于以下三种方法:

      1 _read_body_until_close

      2 _read_fixed_body

      3 _read_chunked_body

     来看一下代码:

        def _read_body(self, code, headers, delegate):
            # https://imququ.com/post/transfer-encoding-header-in-http.html
            if "Content-Length" in headers:
                if "Transfer-Encoding" in headers:
                    # Response cannot contain both Content-Length and
                    # Transfer-Encoding headers.
                    # If a message is received with both a Transfer-Encoding and a
                    # Content-Length header field, the Transfer-Encoding overrides the
                    # Content-Length.  
                    # http://tools.ietf.org/html/rfc7230#section-3.3.3
                    raise httputil.HTTPInputError("Response with both Transfer-Encoding and Content-Length")
                if "," in headers["Content-Length"]:
                    # Proxies sometimes cause Content-Length headers to get
                    # duplicated.  If all the values are identical then we can
                    # use them but if they differ it's an error.
                    pieces = re.split(r',s*', headers["Content-Length"])
                    if any(i != pieces[0] for i in pieces):
                        raise httputil.HTTPInputError(
                            "Multiple unequal Content-Lengths: %r" %
                            headers["Content-Length"])
                    headers["Content-Length"] = pieces[0]
    
                try:
                    content_length = int(headers["Content-Length"])
                except ValueError:
                    # Handles non-integer Content-Length value.
                    raise httputil.HTTPInputError(
                        "Only integer Content-Length is allowed: %s" % headers["Content-Length"])
    
                if content_length > self._max_body_size:
                    raise httputil.HTTPInputError("Content-Length too long")
            else:
                content_length = None
    
            if code == 204:        # 状态码204(无内容)
                # This response code is not allowed to have a non-empty body,
                # and has an implicit length of zero instead of read-until-close.
                # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
                if ("Transfer-Encoding" in headers or content_length not in (None, 0)):
                    raise httputil.HTTPInputError(
                        "Response with code %d should not have body" % code)
                content_length = 0
    
            if content_length is not None: # 而如果开启了keep-alive,那么我们根据Content-Length确定当前报文的终止位置
                return self._read_fixed_body(content_length, delegate)
            if headers.get("Transfer-Encoding") == "chunked": # 开启了分块传输编码
                return self._read_chunked_body(delegate)
            if self.is_client: # 非持久连接
                return self._read_body_until_close(delegate)
            return None
    
        @gen.coroutine
        def _read_fixed_body(self, content_length, delegate):
            while content_length > 0:
                body = yield self.stream.read_bytes(min(self.params.chunk_size, content_length), partial=True)
                content_length -= len(body)
                if not self._write_finished or self.is_client:
                    with _ExceptionLoggingContext(app_log):
                        ret = delegate.data_received(body)
                        if ret is not None:
                            yield ret
    
        @gen.coroutine
        def _read_chunked_body(self, delegate):
            # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
            total_size = 0
            while True:
                # 先读取chunk长度
                chunk_len = yield self.stream.read_until(b"
    ", max_bytes=64)
                chunk_len = int(chunk_len.strip(), 16)
                # 如果chunk长度为0,表示分块传输的终止
                if chunk_len == 0:
                    return
                total_size += chunk_len
                # 检测长度是否过大
                if total_size > self._max_body_size:
                    raise httputil.HTTPInputError("chunked body too large")
                bytes_to_read = chunk_len
                while bytes_to_read:
                    # 读取该长度对应的data
                    chunk = yield self.stream.read_bytes(
                        min(bytes_to_read, self.params.chunk_size), partial=True)
                    bytes_to_read -= len(chunk)
                    if not self._write_finished or self.is_client:
                        with _ExceptionLoggingContext(app_log):
                            # 读取的消息主体要交给代理(HTTPMessageDelegate)处理
                            ret = delegate.data_received(chunk)
                            if ret is not None:
                                yield ret
                # chunk ends with 
    
                # 每一个data后面都有一个CRLF
                crlf = yield self.stream.read_bytes(2)
                assert crlf == b"
    "
    
        @gen.coroutine
        def _read_body_until_close(self, delegate):
            body = yield self.stream.read_until_close()
            if not self._write_finished or self.is_client:
                with _ExceptionLoggingContext(app_log):
                    delegate.data_received(body)
    View Code

     2 写入报文

      写入报文主要可以分两步:

        1  写入报文起始行以及头部

        2  写入报文主体

    与之相关的方法主要有三个,来看源码:

        def write_headers(self, start_line, headers, chunk=None, callback=None):
            """Implements `.HTTPConnection.write_headers`.写入起始行和消息头"""
            lines = []
            if self.is_client:    # 客户端,那就是发送请求了
                self._request_start_line = start_line
                lines.append(utf8('%s %s HTTP/1.1' % (start_line[0], start_line[1])))
                # Client requests with a non-empty body must have either a
                # Content-Length or a Transfer-Encoding.
                self._chunking_output = (
                    start_line.method in ('POST', 'PUT', 'PATCH') and
                    'Content-Length' not in headers and
                    'Transfer-Encoding' not in headers)
            else:    # 服务端,那就是发送响应了
                self._response_start_line = start_line
                lines.append(utf8('HTTP/1.1 %d %s' % (start_line[1], start_line[2])))
                self._chunking_output = (
                    # TODO: should this use
                    # self._request_start_line.version or
                    # start_line.version?
                    self._request_start_line.version == 'HTTP/1.1' and
                    # 304 responses have no body (not even a zero-length body), and so
                    # should not have either Content-Length or Transfer-Encoding.
                    # headers.
                    start_line.code not in (204, 304) and
                    # No need to chunk the output if a Content-Length is specified.
                    'Content-Length' not in headers and
                    # Applications are discouraged from touching Transfer-Encoding,
                    # but if they do, leave it alone.
                    'Transfer-Encoding' not in headers)
                # If a 1.0 client asked for keep-alive, add the header.
                if (self._request_start_line.version == 'HTTP/1.0' and
                    (self._request_headers.get('Connection', '').lower() ==
                     'keep-alive')):
                    headers['Connection'] = 'Keep-Alive'
            # tornado无论作为客户端还是服务端默认是支持分块传输的
            if self._chunking_output:
                headers['Transfer-Encoding'] = 'chunked'
            # 这下这种情况消息主体应为空
            if (not self.is_client and
                (self._request_start_line.method == 'HEAD' or
                 start_line.code == 304)):
                self._expected_content_remaining = 0
            elif 'Content-Length' in headers:
                self._expected_content_remaining = int(headers['Content-Length'])
            else:
                self._expected_content_remaining = None
            # TODO: headers are supposed to be of type str, but we still have some
            # cases that let bytes slip through. Remove these native_str calls when those
            # are fixed.
            header_lines = (native_str(n) + ": " + native_str(v) for n, v in headers.get_all())
            if PY3:
                lines.extend(l.encode('latin1') for l in header_lines)
            else:
                lines.extend(header_lines)
            for line in lines:
                if b'
    ' in line:
                    raise ValueError('Newline in header: ' + repr(line))
            future = None
            if self.stream.closed():
                future = self._write_future = Future()
                future.set_exception(iostream.StreamClosedError())
                future.exception()
            else:
                if callback is not None:
                    self._write_callback = stack_context.wrap(callback)
                else:
                    future = self._write_future = Future()
                data = b"
    ".join(lines) + b"
    
    "
                if chunk:
                    data += self._format_chunk(chunk)
                self._pending_write = self.stream.write(data)
                self._pending_write.add_done_callback(self._on_write_complete)
            return future
    
        def _format_chunk(self, chunk):
            '''如果采用了分块传输编码,则将参数chunk转换为http中规定的格式。如果没有使用分块传输编码则原样返回'''
            if self._expected_content_remaining is not None:
                self._expected_content_remaining -= len(chunk)
                if self._expected_content_remaining < 0:
                    # Close the stream now to stop further framing errors.
                    self.stream.close()
                    raise httputil.HTTPOutputError(
                        "Tried to write more data than Content-Length")
            if self._chunking_output and chunk:
                # Don't write out empty chunks because that means END-OF-STREAM
                # with chunked encoding
                return utf8("%x" % len(chunk)) + b"
    " + chunk + b"
    "
            else:
                return chunk
    
        def write(self, chunk, callback=None):
            """Implements `.HTTPConnection.write`. 写入报文主体
    
            For backwards compatibility is is allowed but deprecated to
            skip `write_headers` and instead call `write()` with a
            pre-encoded header block.
            """
            future = None
            if self.stream.closed():
                future = self._write_future = Future()
                self._write_future.set_exception(iostream.StreamClosedError())
                self._write_future.exception()
            else:
                if callback is not None:
                    self._write_callback = stack_context.wrap(callback)
                else:
                    future = self._write_future = Future()
                self._pending_write = self.stream.write(self._format_chunk(chunk))
                self._pending_write.add_done_callback(self._on_write_complete)
            return future
    View Code

    HTTP1ServerConnection类

    HTTP1ServerConnection比较简单,主要实现了服务端处理逻辑:

      在本条tcp连接上,不停的处理http事务(当然也有可能客户端不支持持久连接所以处理完一条http事务后,tcp连接被关闭)

      当发生异常时,关闭连接。

    其中start_serving方法是入口,其内部调用了_server_request_loop,来看代码

    def start_serving(self, delegate):
            """
                Starts serving requests on this connection.
    
                :arg delegate: a `.HTTPServerConnectionDelegate`
            """
            assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 
            self._serving_future = self._server_request_loop(delegate)
            # Register the future on the IOLoop so its errors get logged.
            self.stream.io_loop.add_future(self._serving_future, lambda f: f.result())
    
        @gen.coroutine
        def _server_request_loop(self, delegate):
            try:
                while True:
                    # 不断处理http事务,知道连接关闭或者出现异常
                    conn = HTTP1Connection(self.stream, False, self.params, self.context)
                    request_delegate = delegate.start_request(self, conn)    
                    try:
                        ret = yield conn.read_response(request_delegate)
                    except (iostream.StreamClosedError, iostream.UnsatisfiableReadError): # 连接关闭
                        return
                    except _QuietException:
                        # This exception was already logged.
                        conn.close()
                        return
                    except Exception:
                        gen_log.error("Uncaught exception", exc_info=True)
                        conn.close()
                        return
                    if not ret:
                        return
                    yield gen.moment
            finally:
                delegate.on_close(self)
    View Code

    参考:

      http://www.cnblogs.com/tekkaman/archive/2013/04/03/2997781.html

      http://strawhatfy.github.io/2015/11/02/tornado.http1connection.HTTP1Connection/

  • 相关阅读:
    hdu 6702 ^&^ 位运算
    hdu 6709 Fishing Master 贪心
    hdu 6704 K-th occurrence 二分 ST表 后缀数组 主席树
    hdu 1423 Greatest Common Increasing Subsequence 最长公共上升子序列 LCIS
    hdu 5909 Tree Cutting FWT
    luogu P1588 丢失的牛 宽搜
    luogu P1003 铺地毯
    luogu P1104 生日
    luogu P1094 纪念品分组
    luogu P1093 奖学金
  • 原文地址:https://www.cnblogs.com/MnCu8261/p/6888904.html
Copyright © 2011-2022 走看看