代码改变世界
[登录 · 注册]
  • HttpClient连接池的一些思考
  • 前言

    在分布式系统中,http服务调用少不了HttpClient工具类。相信大家使用apache的HttpClient进行http的交互处理已经很长时间了,而httpclient内部其实使用了http连接池,想必大家也没有关心过连接池的管理。事实上,通过分析httpclient源码,发现它很优雅地隐藏了所有的连接池管理细节,开发者完全不用花太多时间去思考连接池的问题。但是连接池的一些常用配置和原理都需要我们多少掌握一些,避免错用导致生产环境出现无法预知的bug。

    我之前遇到过生产环境系统使用HttpClient工具类导致系统假死的问题。

    情景1:httpclient调用服务时没有设置超时时间,http线程都处于自身阻塞状态,拖垮应用服务器。

    情景2:httpclient连接池单个路由最大连接数设置不合理,业务高峰期时无法及时从连接池中获取连接,导致http线程都处于BLOCKED(阻塞于锁)状态,逐渐拖垮应用服务器。

    从以上两个场景分析一下,首先对于超时时间的设置,要考虑http请求连接超时、http请求读取内容超时、从连接池中获取可用http连接超时,要根据具体的业务场景合理化配置。还有就是最大连接池数和每个路由最大连接数配置,也需要考虑哪些业务是否有业务高峰期然后合理配置,针对特殊的业务设置使用唯一的HttpClient,即为该业务创建指定的线程池。

    Apache官网例子

    CloseableHttpClient httpclient = HttpClients.createDefault();
    HttpGet httpget = new HttpGet("http://localhost/");
    CloseableHttpResponse response = httpclient.execute(httpget);
    try {
        HttpEntity entity = response.getEntity();
        if (entity != null) {
            long len = entity.getContentLength();
            if (len != -1 && len < 2048) {
                System.out.println(EntityUtils.toString(entity));
            } else {
                // Stream content out
            }
        }
    } finally {
        response.close();
    }

    HttpClient及其连接池配置

    • 整个线程池中最大连接数 MAX_CONNECTION_TOTAL = 800
    • 路由到某台主机最大并发数,是MAX_CONNECTION_TOTAL(整个线程池中最大连接数)的一个细分 ROUTE_MAX_COUNT = 500
    • 重试次数,防止失败情况 RETRY_COUNT = 3
    • 客户端和服务器建立连接的超时时间 CONNECTION_TIME_OUT = 5000
    • 客户端从服务器读取数据的超时时间 READ_TIME_OUT = 7000
    • 从连接池中获取连接的超时时间 CONNECTION_REQUEST_TIME_OUT = 5000
    • 连接空闲超时,清楚闲置的连接 CONNECTION_IDLE_TIME_OUT = 5000
    • 连接保持存活时间 DEFAULT_KEEP_ALIVE_TIME_MILLIS = 20 * 1000

    MaxtTotal和DefaultMaxPerRoute的区别

    • MaxtTotal是整个池子的大小;
    • DefaultMaxPerRoute是根据连接到的主机对MaxTotal的一个细分;

    比如:MaxtTotal=400,DefaultMaxPerRoute=200,而我只连接到http://hjzgg.com时,到这个主机的并发最多只有200;而不是400;而我连接到http://qyxjj.com 和 http://httls.com时,到每个主机的并发最多只有200;即加起来是400(但不能超过400)。所以起作用的设置是DefaultMaxPerRoute。

    HttpClient连接池模型

    HttpClient从连接池中获取连接源码分析

    org.apache.http.pool.AbstractConnPool

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final PoolEntryFuture<E> future)
                throws IOException, InterruptedException, TimeoutException {
    
    
        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date
                (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
    
    
        this.lock.lock();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);//这是每一个路由细分出来的连接池
            E entry = null;
            while (entry == null) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                //从池子中获取一个可用连接并返回
                for (;;) {
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    } else if (this.validateAfterInactivity > 0) {
                        if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                            if (!validate(entry)) {
                                entry.close();
                            }
                        }
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
    
                //创建新的连接
                // New connection is needed
                final int maxPerRoute = getMax(route);//获取当前路由最大并发数
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {//如果当前路由对应的连接池的连接超过最大路由并发数,获取到最后使用的一次连接,释放掉
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
    
                //尝试创建新的连接 
                if (pool.getAllocatedCount() < maxPerRoute) {//当前路由对应的连接池可用空闲连接数+当前路由对应的连接池已用连接数 < 当前路由对应的连接池最大并发数
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {//线程池中可用空闲连接数 > (线程池中最大连接数 - 线程池中已用连接数 - 1)
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
    
    
                boolean success = false;
                try {
                    pool.queue(future);
                    this.pending.add(future);
                    success = future.await(deadline);
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null) &&
                    (deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

    HttpClient从连接池中获取连接流程图

    连接重用和保持策略

    http的长连接复用, 其判定规则主要分两类。
      1. http协议支持+请求/响应header指定
      2. 一次交互处理的完整性(响应内容消费干净)
      对于前者, httpclient引入了ConnectionReuseStrategy来处理, 默认的采用如下的约定:

    • HTTP/1.0通过在Header中添加Connection:Keep-Alive来表示支持长连接。
    • HTTP/1.1默认支持长连接, 除非在Header中显式指定Connection:Close, 才被视为短连接模式。

    HttpClientBuilder创建MainClientExec

    ConnectionReuseStrategy(连接重用策略) 

    org.apache.http.impl.client.DefaultClientConnectionReuseStrategy

    MainClientExec处理连接

    处理完请求后,获取到response,通过ConnectionReuseStrategy判断连接是否可重用,如果是通过ConnectionKeepAliveStrategy获取到连接最长有效时间,并设置连接可重用标记。

    连接重用判断逻辑

    • request首部中包含Connection:Close,不复用
    • response中Content-Length长度设置不正确,不复用
    • response首部包含Connection:Close,不复用
    • reponse首部包含Connection:Keep-Alive,复用
    • 都没命中的情况下,如果HTTP版本高于1.0则复用

    更多参考:https://www.cnblogs.com/mumuxinfei/p/9121829.html

    连接释放原理分析

    HttpClientBuilder会构建一个InternalHttpClient实例,也是CloseableHttpClient实例。InternalHttpClient的doExecute方法来完成一次request的执行。

    会继续调用MainClientExec的execute方法,通过连接池管理者获取连接(HttpClientConnection)。

    构建ConnectionHolder类型对象,传递连接池管理者对象和当前连接对象。

    请求执行完返回HttpResponse类型对象,然后包装成HttpResponseProxy对象(是CloseableHttpResponse实例)返回。

    CloseableHttpClient类其中一个execute方法如下,finally方法中会调用HttpResponseProxy对象的close方法释放连接。

    最终调用ConnectionHolder的releaseConnection方法释放连接。

    CloseableHttpClient类另一个execute方法如下,返回一个HttpResponseProxy对象(是CloseableHttpResponse实例)。 

    这种情况下调用者获取了HttpResponseProxy对象,可以直接拿到HttpEntity对象。大家关心的就是操作完HttpEntity对象,使用完InputStream到底需不需要手动关闭流呢?

    其实调用者不需要手动关闭流,因为HttpResponseProxy构造方法里有增强HttpEntity的处理方法,如下。

    调用者最终拿到的HttpEntity对象是ResponseEntityProxy实例。

    ResponseEntityProxy重写了获取InputStream的方法,返回的是EofSensorInputStream类型的InputStream对象。

    EofSensorInputStream对象每次读取都会调用checkEOF方法,判断是否已经读取完毕。

    checkEOF方法会调用ResponseEntityProxy(实现了EofSensorWatcher接口)对象的eofDetected方法。

    EofSensorWatcher#eofDetected方法中会释放连接并关闭流。

    综上,通过CloseableHttpClient实例处理请求,无需调用者手动释放连接

    HttpClient在Spring中应用

    创建ClientHttpRequestFactory

    @Bean
    public ClientHttpRequestFactory clientHttpRequestFactory()
            throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (arg0, arg1) -> true).build();
    
        httpClientBuilder.setSSLContext(sslContext)
                .setMaxConnTotal(MAX_CONNECTION_TOTAL)
                .setMaxConnPerRoute(ROUTE_MAX_COUNT)
                .evictIdleConnections(CONNECTION_IDLE_TIME_OUT, TimeUnit.MILLISECONDS);
    
        httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(RETRY_COUNT, true));
        httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
        CloseableHttpClient client = httpClientBuilder.build();
    
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(client);
        clientHttpRequestFactory.setConnectTimeout(CONNECTION_TIME_OUT);
        clientHttpRequestFactory.setReadTimeout(READ_TIME_OUT);
        clientHttpRequestFactory.setConnectionRequestTimeout(CONNECTION_REQUEST_TIME_OUT);
        clientHttpRequestFactory.setBufferRequestBody(false);
        return clientHttpRequestFactory;
    }

    创建RestTemplate

    @Bean
    public RestTemplate restTemplate()  {
       RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory());
            restTemplate.setErrorHandler(new DefaultResponseErrorHandler());
            // 修改StringHttpMessageConverter内容转换器
            restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
            return restTemplate;
    }

     Spring官网例子

    @SpringBootApplication
    public class Application {
    
        private static final Logger log = LoggerFactory.getLogger(Application.class);
    
        public static void main(String args[]) {
            SpringApplication.run(Application.class);
        }
    
        @Bean
        public RestTemplate restTemplate(RestTemplateBuilder builder) {
            return builder.build();
        }
    
        @Bean
        public CommandLineRunner run(RestTemplate restTemplate) throws Exception {
            return args -> {
                Quote quote = restTemplate.getForObject(
                        "https://gturnquist-quoters.cfapps.io/api/random", Quote.class);
                log.info(quote.toString());
            };
        }
    }

    应用场景和效果

    项目中主要应用场景

    微门户系统和腾讯、支付宝第三方服务交互,自定义专属Httpclient连接池。

    微门户内部系统之间交互,通用HttpClient连接池。尤其查询用户信息、和微币、手机号段、自助服务这种核心查询服务,量级非常大,一定要使用连接池。

    微门户系统和跨团队系统、跨部门系统使用CSF客户端,新版CSF客户端也是使用HttpClient连接池技术。

    连接池优点和效果提升

    连接池的连接资源重用策略,避免了频繁创建、释放连接引起的大量性能开销,增加了系统运行的平稳性。

    连接池统一连接管理策略,避免连接资源泄漏或者大量连接资源创建导致内存溢出。

    模拟并发场景,和腾讯使用连接池平均响应时间在50ms~80ms之间。为啥性能提升的这么明显呢?其主要原因就是省去了大量连接建立与释放的时间。

    总结

    Apache的HttpClient组件可谓良心之作,细细的品味一下源码可以学到很多设计模式和比编码规范。不过在阅读源码之前最好了解一下不同版本的HTTP协议,尤其是HTTP协议的Keep-Alive模式。使用Keep-Alive模式(又称持久连接、连接重用)时,Keep-Alive功能使客户端到服 务器端的连接持续有效,当出现对服务器的后继请求时,Keep-Alive功能避免了建立或者重新建立连接。这里推荐一篇参考链接:https://www.jianshu.com/p/49551bda6619。

    个人感觉分布式系统项目中都应该封装一套HttpClient工具类,针对常用参数、连接池信息、服务调用、出网代理、支持Https方式等合理包装,哪些业务可以共用一个HttpClient,哪些业务只能使用专属的HttpClient,避免误用导致生产环境发生无法预知的问题。

  • 【推广】 阿里云小站-上云优惠聚集地(新老客户同享)更有每天限时秒杀!
    【推广】 云服务器低至0.95折 1核2G ECS云服务器8.1元/月
    【推广】 阿里云老用户升级四重礼遇享6.5折限时折扣!
  • 原文:https://www.cnblogs.com/hujunzheng/p/11629198.html
走看看 - 开发者的网上家园