zoukankan      html  css  js  c++  java
  • httpclient源码分析之MainClientExec

    MainClientExec是HTTP请求处理链中最后一个请求执行环节,负责与另一终端的请求/响应交互,也是很重要的类。

    源码版本是4.5.2,主要看execute方法,并在里面添加注释。接着详细说下获取连接的过程。

    execute方法

        @Override
        public CloseableHttpResponse execute(
                final HttpRoute route,
                final HttpRequestWrapper request,
                final HttpClientContext context,
                final HttpExecutionAware execAware) throws IOException, HttpException {
            Args.notNull(route, "HTTP route");
            Args.notNull(request, "HTTP request");
            Args.notNull(context, "HTTP context");
    
            //Auth相关,这里没关注
            AuthState targetAuthState = context.getTargetAuthState();
            if (targetAuthState == null) {
                targetAuthState = new AuthState();
                context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
            }
            AuthState proxyAuthState = context.getProxyAuthState();
            if (proxyAuthState == null) {
                proxyAuthState = new AuthState();
                context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
            }
    
            if (request instanceof HttpEntityEnclosingRequest) {
                RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
            }
    
            //userToken后面作为state,用来从连接池中获取连接的时候使用,默认是null。
            //如果设置了值,会设置到连接中,再次获取的时候,则优先取status相等的连接
            Object userToken = context.getUserToken();
    
            //ConnectionRequest用来获取HttpClientConnection
            //为每一个route设置一个连接池,大小可以配置,默认为2
            //从route连接池获取一个连接,优先取status等于userToken的。
            //这里没有实质的操作,只是创建一个ConnectionRequest,并将获取连接的操作封装在ConnectionRequest中。
            final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
            if (execAware != null) {
                if (execAware.isAborted()) {
                    connRequest.cancel();
                    throw new RequestAbortedException("Request aborted");
                } else {
                    execAware.setCancellable(connRequest);
                }
            }
    
            final RequestConfig config = context.getRequestConfig();
    
            final HttpClientConnection managedConn;
            try {
                final int timeout = config.getConnectionRequestTimeout();
    
                //获取连接,这里才执行从连接池中阻塞获取连接的操作,并设置超时时间。
                //这里返回的connection,不一定是有效的socket连接,长短连接处理方式不同。
                //如果连接没有打开或者不可用,后面会重新建立socket连接。 
                managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
            } catch(final InterruptedException interrupted) {
                Thread.currentThread().interrupt();
                throw new RequestAbortedException("Request aborted", interrupted);
            } catch(final ExecutionException ex) {
                Throwable cause = ex.getCause();
                if (cause == null) {
                    cause = ex;
                }
                throw new RequestAbortedException("Request execution failed", cause);
            }
    
            //将连接加入上下文中,暴露连接。 
            //context就是一个大容器,收藏各种东西,如果觉得有什么资源是需要在别的地方用到的,那就放入context吧。
            context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
    
            //是否检查连接的有效性。如果检查不可用,就关闭连接。对于关闭的连接,后面会从三次握手开始,重新建立socket连接。
            //如果配置检查,就相当于一个悲观锁,每次请求都会消耗最多30ms来检测,影响性能。4.4版本开始就过时了。
            if (config.isStaleConnectionCheckEnabled()) {
                // validate connection,首先判断连接是否是打开的
                if (managedConn.isOpen()) {
                    this.log.debug("Stale connection check");
                    //如果是打开的,进一步判断是否可用
                    if (managedConn.isStale()) {
                        this.log.debug("Stale connection detected");
                        //不可用的时候,需要关闭连接,后面再重新建立连接
                        managedConn.close();
                    }
                }
            }
    
            final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
            try {
                if (execAware != null) {
                    execAware.setCancellable(connHolder);
                }
    
                HttpResponse response;
                for (int execCount = 1;; execCount++) {
    
                    //请求是否幂等的,如果不是,则不能retry,抛异常
                    if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                        throw new NonRepeatableRequestException("Cannot retry request " +
                                "with a non-repeatable request entity.");
                    }
    
                    if (execAware != null && execAware.isAborted()) {
                        throw new RequestAbortedException("Request aborted");
                    }
    
                    //如果连接没有打开,即连接使用的socket为null,则重新建立连接。    
                    if (!managedConn.isOpen()) {
                        this.log.debug("Opening connection " + route);
                        try {
                            //建立socket连接。
                            //遍历地址集,成功建立socket连接,就返回,封装在connection中
                            establishRoute(proxyAuthState, managedConn, route, request, context);
                        } catch (final TunnelRefusedException ex) {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug(ex.getMessage());
                            }
                            response = ex.getResponse();
                            break;
                        }
                    }
                    final int timeout = config.getSocketTimeout();
                    if (timeout >= 0) {
                        //设置socketTimeout
                        managedConn.setSocketTimeout(timeout);
                    }
    
                    if (execAware != null && execAware.isAborted()) {
                        throw new RequestAbortedException("Request aborted");
                    }
    
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Executing request " + request.getRequestLine());
                    }
    
                    if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Target auth state: " + targetAuthState.getState());
                        }
                        this.authenticator.generateAuthResponse(request, targetAuthState, context);
                    }
                    if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Proxy auth state: " + proxyAuthState.getState());
                        }
                        this.authenticator.generateAuthResponse(request, proxyAuthState, context);
                    }
    
                     //和服务器具体交互,发送请求头,如果有响应,再接收响应。   
                    response = requestExecutor.execute(request, managedConn, context);
    
                    //根据配置的策略,判断是否保持连接,永久还是一段时长
                    // The connection is in or can be brought to a re-usable state.
                    if (reuseStrategy.keepAlive(response, context)) {
                        // Set the idle duration of this connection
                        final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                        if (this.log.isDebugEnabled()) {
                            final String s;
                            if (duration > 0) {
                                s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                            } else {
                                s = "indefinitely";
                            }
                            this.log.debug("Connection can be kept alive " + s);
                        }
                        connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                        connHolder.markReusable();
                    } else {
                        connHolder.markNonReusable();
                    }
    
                    //跳过
                    if (needAuthentication(
                            targetAuthState, proxyAuthState, route, response, context)) {
                        // Make sure the response body is fully consumed, if present
                        final HttpEntity entity = response.getEntity();
                        if (connHolder.isReusable()) {
                            EntityUtils.consume(entity);
                        } else {
                            managedConn.close();
                            if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
                                    && proxyAuthState.getAuthScheme() != null
                                    && proxyAuthState.getAuthScheme().isConnectionBased()) {
                                this.log.debug("Resetting proxy auth state");
                                proxyAuthState.reset();
                            }
                            if (targetAuthState.getState() == AuthProtocolState.SUCCESS
                                    && targetAuthState.getAuthScheme() != null
                                    && targetAuthState.getAuthScheme().isConnectionBased()) {
                                this.log.debug("Resetting target auth state");
                                targetAuthState.reset();
                            }
                        }
                        // discard previous auth headers
                        final HttpRequest original = request.getOriginal();
                        if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
                            request.removeHeaders(AUTH.WWW_AUTH_RESP);
                        }
                        if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
                            request.removeHeaders(AUTH.PROXY_AUTH_RESP);
                        }
                    } else {
                        break;
                    }
                }
    
                if (userToken == null) {
                    userToken = userTokenHandler.getUserToken(context);
                    context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
                }
                if (userToken != null) {
                    connHolder.setState(userToken);
                }
    
                // check for entity, release connection if possible
                //判断是否读取了全部的响应,如果是,则释放连接回连接池,
                //否则,也要返回连接,以便后面继续从流中读取响应。
                final HttpEntity entity = response.getEntity();
                if (entity == null || !entity.isStreaming()) {
                    // connection not needed and (assumed to be) in re-usable state
                    connHolder.releaseConnection();
                    return new HttpResponseProxy(response, null);
                } else {
                    return new HttpResponseProxy(response, connHolder);
                }
            } catch (final ConnectionShutdownException ex) {
                final InterruptedIOException ioex = new InterruptedIOException(
                        "Connection has been shut down");
                ioex.initCause(ex);
                throw ioex;
            } catch (final HttpException ex) {
                connHolder.abortConnection();
                throw ex;
            } catch (final IOException ex) {
                connHolder.abortConnection();
                throw ex;
            } catch (final RuntimeException ex) {
                connHolder.abortConnection();
                throw ex;
            }
        }
    

    总结一下关心的大致流程:

    • 创建连接请求
    • 根据连接请求的参数,从连接池中获取一个连接
    • 配置是否需要校验连接可用性。如果检查不可用,就关闭连接。
    • 如果连接没有打开,则创建一个底层的socket连接。
    • 发送请求头部(如果请求中带有entity,则发送)
    • 如果有响应,接收响应(先接收头部,如果有请求主体,则接收)

    这里有一点注意一下:
    检测连接有效性的时候,报的是SocketTimeOut异常,而真正读响应的时候,报的是Connection reset异常。为什么不一样呢?我还没找到方法验证,但这里很可能是检测的时间很短,只有1ms,首先触发了SocketTimeOut异常,而实际读响应的时候,是不会这么短时间的。

    获取连接

    接下来详细说说根据ConnectionRequest获取HttpClientConnection。即:
    managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);

    首先看ConnectionRequest为何物:

        //org.apache.http.impl.conn.PoolingHttpClientConnectionManager
    
        @Override
        public ConnectionRequest requestConnection(
                final HttpRoute route,
                final Object state) {
            Args.notNull(route, "HTTP route");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection request: " + format(route, state) + formatStats(route));
            }
            //从连接池中获取一个CPoolEntry(Connection的包装类)
            final Future<CPoolEntry> future = this.pool.lease(route, state, null);
            return new ConnectionRequest() {
    
                @Override
                public boolean cancel() {
                    return future.cancel(true);
                }
    
                // ConnectionRequest的get方法。调用leaseConnection方法,并且传入future(CPoolEntry的封装(connection的封装))
                @Override
                public HttpClientConnection get(
                        final long timeout,
                        final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                    return leaseConnection(future, timeout, tunit);
                }
            };
        }
    

    所以,ConnectionRequest的get方法,实际是调用PoolingHttpClientConnectionManager的leaseConnection,返回一个HttpClientConnection。
    关于获取connection的更多详细信息,可以参考这篇文章,详细讲述了PoolingHttpClientConnectionManager的获取连接给用户的方法。

    补充

    • 关于config.isStaleConnectionCheckEnabled():
      如果设置每次请求检查连接是否可用,会影响性能。4.4版本开始过时,但官方推荐使用org.apache.http.impl.conn.PoolingHttpClientConnectionManager#getValidateAfterInactivity()。详细了解看这篇最后补充的校验连接有效的方法,有一个案例分析。

    • 最后返回的HttpResponseProxy带上ConnectionHolder(响应没有一次读完),这篇文章有一个案例了解,查看成功日志的最后几个步骤。RestTemplate读取扩展字段,第二次读取数据。

    另一篇关于此段源码的解读,见这里

  • 相关阅读:
    仿jquery 选择器功能
    多个div拖拽功能
    js 模拟jquery onready 事件
    随着鼠标移动的图片百叶窗效果
    计算体重引发的思考
    js 模拟事件
    表单验证功能(利用冒泡功能)
    视频播放滚动条(最终完善版)
    仿制视频播放滚动条效果(加左右控制按钮)
    无极树(待整理)
  • 原文地址:https://www.cnblogs.com/shoren/p/httpclient-MainClientExec.html
Copyright © 2011-2022 走看看