zoukankan      html  css  js  c++  java
  • 异步httpclient(httpasyncclient)的使用与总结

    参考:异步httpclient(httpasyncclient)的使用与总结

    1. 前言
    应用层的网络模型有同步与异步。同步意味当前线程是阻塞的,只有本次请求完成后才能进行下一次请求;异步意味着所有的请求可以同时塞入缓冲区,不阻塞当前的线程;

    httpclient在4.x之后开始提供基于nio的异步版本httpasyncclient,httpasyncclient借助了Java并发库和nio进行封装(虽说NIO是同步非阻塞IO,但是HttpAsyncClient提供了回调的机制,与netty类似,所以可以模拟类似于AIO的效果),其调用方式非常便捷,但是其中也有许多需要注意的地方。

    2. pom文件
    本文依赖4.1.2,当前最新的客户端版本是4.1.3maven repository 地址

    <dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.2</version>
    </dependency>

    <dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpcore</artifactId>
    <version>4.4.5</version>
    </dependency>

    <dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpcore-nio</artifactId>
    <version>4.4.5</version>
    </dependency>

    <dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpasyncclient</artifactId>
    <version>4.1.2</version>
    </dependency>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    3. 简单的实例
    public class TestHttpClient {
    public static void main(String[] args){

    RequestConfig requestConfig = RequestConfig.custom()
    .setConnectTimeout(50000)
    .setSocketTimeout(50000)
    .setConnectionRequestTimeout(1000)
    .build();

    //配置io线程
    IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
    setIoThreadCount(Runtime.getRuntime().availableProcessors())
    .setSoKeepAlive(true)
    .build();
    //设置连接池大小
    ConnectingIOReactor ioReactor=null;
    try {
    ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
    } catch (IOReactorException e) {
    e.printStackTrace();
    }
    PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
    connManager.setMaxTotal(100);
    connManager.setDefaultMaxPerRoute(100);


    final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
    setConnectionManager(connManager)
    .setDefaultRequestConfig(requestConfig)
    .build();


    //构造请求
    String url = "http://127.0.0.1:9200/_bulk";
    HttpPost httpPost = new HttpPost(url);
    StringEntity entity = null;
    try {
    String a = "{ "index": { "_index": "test", "_type": "test"} } " +
    "{"name": "上海","age":33} ";
    entity = new StringEntity(a);
    } catch (UnsupportedEncodingException e) {
    e.printStackTrace();
    }
    httpPost.setEntity(entity);

    //start
    client.start();

    //异步请求
    client.execute(httpPost, new Back());

    while(true){
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    static class Back implements FutureCallback<HttpResponse>{

    private long start = System.currentTimeMillis();
    Back(){
    }

    public void completed(HttpResponse httpResponse) {
    try {
    System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+EntityUtils.toString(httpResponse.getEntity()));
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void failed(Exception e) {
    System.err.println(" cost is:"+(System.currentTimeMillis()-start)+":"+e);
    }

    public void cancelled() {

    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    4. 几个重要的参数
    4.1 TimeOut(3个)的设置


    ConnectTimeout : 连接超时,连接建立时间,三次握手完成时间。
    SocketTimeout : 请求超时,数据传输过程中数据包之间间隔的最大时间。
    ConnectionRequestTimeout : 使用连接池来管理连接,从连接池获取连接的超时时间。

    在实际项目开发过程中,这三个值可根据具体情况设置。

    (1) 下面针对ConnectionRequestTimeout的情况进行分析

    实验条件:设置连接池最大连接数为1,每一个异步请求从开始到回调的执行时间在100ms以上;

    实验过程:连续发送2次请求

    public class TestHttpClient {
    public static void main(String[] args){

    RequestConfig requestConfig = RequestConfig.custom()
    .setConnectTimeout(50000)
    .setSocketTimeout(50000)
    .setConnectionRequestTimeout(10)//设置为10ms
    .build();

    //配置io线程
    IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
    setIoThreadCount(Runtime.getRuntime().availableProcessors())
    .setSoKeepAlive(true)
    .build();
    //设置连接池大小
    ConnectingIOReactor ioReactor=null;
    try {
    ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
    } catch (IOReactorException e) {
    e.printStackTrace();
    }
    PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
    connManager.setMaxTotal(1);//最大连接数设置1
    connManager.setDefaultMaxPerRoute(1);//per route最大连接数设置1


    final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
    setConnectionManager(connManager)
    .setDefaultRequestConfig(requestConfig)
    .build();


    //构造请求
    String url = "http://127.0.0.1:9200/_bulk";
    List<HttpPost> list = new ArrayList<HttpPost>();
    for(int i=0;i<2;i++){
    HttpPost httpPost = new HttpPost(url);
    StringEntity entity = null;
    try {
    String a = "{ "index": { "_index": "test", "_type": "test"} } " +
    "{"name": "上海","age":33} ";
    entity = new StringEntity(a);
    } catch (UnsupportedEncodingException e) {
    e.printStackTrace();
    }
    httpPost.setEntity(entity);
    list.add(httpPost);
    }

    client.start();

    for(int i=0;i<2;i++){
    client.execute(list.get(i), new Back());
    }

    while(true){
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    static class Back implements FutureCallback<HttpResponse>{

    private long start = System.currentTimeMillis();
    Back(){
    }

    public void completed(HttpResponse httpResponse) {
    try {
    System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+EntityUtils.toString(httpResponse.getEntity()));
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void failed(Exception e) {
    e.printStackTrace();
    System.err.println(" cost is:"+(System.currentTimeMillis()-start)+":"+e);
    }

    public void cancelled() {

    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    实验结果 :
    第一次请求执行时间在200ms左右
    第二请求回调直接抛出TimeOutException


    java.util.concurrent.TimeoutException
    at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:364)
    at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:344)
    at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:318)
    at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:303)
    at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:239)
    at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387)
    at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:168)
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
    at java.lang.Thread.run(Thread.java:745)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    结果分析:由于连接池大小是1,第一次请求执行后连接被占用(时间在100ms),第二次请求在规定的时间内无法获取连接,于是直接连接获取的TimeOutException

    (2) 修改ConnectionRequestTimeout

    RequestConfig requestConfig = RequestConfig.custom()
    .setConnectTimeout(50000)
    .setSocketTimeout(50000)
    .setConnectionRequestTimeout(1000)//设置为1000ms
    .build();
    1
    2
    3
    4
    5
    上述两次请求正常执行。

    下面进一步看一下代码中抛异常的地方:

    从上面的代码中可以看到如果要设置永不ConnectionRequestTimeout,只需要将ConnectionRequestTimeout设置为小于0即可,当然后这种设置一定要慎用, 如果处理不当,请求堆积会导致OOM。

    4.2 连接池大小的设置


    ConnTotal:连接池中最大连接数;
    ConnPerRoute(1000):分配给同一个route(路由)最大的并发连接数,route为运行环境机器到目标机器的一条线路,举例来说,我们使用HttpClient的实现来分别请求 www.baidu.com 的资源和 www.bing.com 的资源那么他就会产生两个route;

    对于上述的实验,在一定程度上可以通过增大最大连接数来解决ConnectionRequestTimeout的问题!

    后续:本文重点在于使用,后续会对源码进行分析与解读

    HttpAsyncClient 的简单使用

    下载地址:http://hc.apache.org/downloads.cgi

    在NetBeans中导入以下jar文件:

    1:一次请求:

    复制代码
     public static void oneReuest(){
            final CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
            httpClient.start();
            final HttpGet request = new HttpGet("http://www.apache.org/");
            final Future future = httpClient.execute(request, null);
            try {
                HttpResponse response = (HttpResponse) future.get();
                System.out.println("Response:" + response.getStatusLine());
                System.out.println("Shutting down");
            } catch (Exception ex) {
                Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
            }finally{
                try {
                    httpClient.close();
                } catch (IOException ex) {
                    Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            
            System.out.println("执行完毕");
        }
    复制代码

    2:多次异步请求:

    复制代码
     public static void moreRequest(){
            final RequestConfig requestConfitg = RequestConfig.custom()
                    .setSocketTimeout(3000)
                    .setConnectTimeout(3000).build();
            
            final CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
                    .setDefaultRequestConfig(requestConfitg)
                    .build();
            
            httpClient.start();
            
            final HttpGet[] requests = new HttpGet[]{
                new HttpGet("http://www.apache.org/"),
                new HttpGet("http://www.baidu.com/"),
                new HttpGet("http://www.oschina.net/")
            };
            
            final CountDownLatch latch = new CountDownLatch(requests.length);
            for(final HttpGet request: requests){
                
                    httpClient.execute(request, new FutureCallback(){
                        @Override
                        public void completed(Object obj) {
                           final HttpResponse response = (HttpResponse)obj;
                           latch.countDown();
                           System.out.println(request.getRequestLine() + "->" + response.getStatusLine());
                        }
    
                        @Override
                        public void failed(Exception excptn) {
                            latch.countDown();
                            System.out.println(request.getRequestLine() + "->" + excptn);
                        }
    
                        @Override
                        public void cancelled() {
                            latch.countDown();
                            System.out.println(request.getRequestLine() + "cancelled");
                        }
                    });
             }       
            
            try {
                latch.await();
                System.out.println("Shutting Down");
            } catch (InterruptedException ex) {
                Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
            }finally{
                try {
                    httpClient.close();
                } catch (IOException ex) {
                    Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            System.out.println("Finish!");
        }
    复制代码

    运行结果:

    复制代码
    run:
    GET http://www.baidu.com/ HTTP/1.1->HTTP/1.1 200 OK
    GET http://www.oschina.net/ HTTP/1.1->HTTP/1.1 200 OK
    GET http://www.apache.org/ HTTP/1.1->HTTP/1.1 200 OK
    Shutting Down
    Finish!
    成功构建 (总时间: 2 秒)

    可以看出是异步执行的!不是按照我们传入的URL参数顺序执行的!
    复制代码

    篇提到了高性能处理的关键是异步,而我们当中许多人依旧在使用同步模式的HttpClient访问第三方Web资源,我认为原因之一是:异步的HttpClient诞生较晚,许多人不知道;另外也可能是大多数Web程序其实不在意这点性能损失了。

    而要自己实现一个异步的HttpClient则比较困难,通常都是自己开一个新的工作线程,利用HttpClient的同步去访问,完成后再回调这种形式,这样做其实不是真正的异步,因为依旧会有一个线程处于阻塞中,等待着第三方Web资源的返回。

    而如今访问第三方Web资源的情景越来越多,最典型就是使用第三方登录平台,如QQ或微信等,我们需要访问腾讯的服务器去验证登录者的身份,根据我的经验,这个过程可能会阻塞好几秒钟,可看作是一个“长时间调用”,所以最好要使用异步方式。

    OK,废话少说,要使用异步的HttpClient,请Maven中带上:

            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpasyncclient</artifactId>
                <version>4.1.1</version>
            </dependency>

    接下来是一个完整的Demo代码:

    复制代码
    import org.apache.http.HttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.concurrent.FutureCallback;
    import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
    import org.apache.http.impl.nio.client.HttpAsyncClients;
    import org.apache.http.util.EntityUtils;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class Main {
        public static void main(String[] argv) {
            CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
            httpclient.start();
    
            final CountDownLatch latch = new CountDownLatch(1);
            final HttpGet request = new HttpGet("https://www.alipay.com/");
    
            System.out.println(" caller thread id is : " + Thread.currentThread().getId());
    
            httpclient.execute(request, new FutureCallback<HttpResponse>() {
    
                public void completed(final HttpResponse response) {
                    latch.countDown();
                    System.out.println(" callback thread id is : " + Thread.currentThread().getId());
                    System.out.println(request.getRequestLine() + "->" + response.getStatusLine());
                    try {
                        String content = EntityUtils.toString(response.getEntity(), "UTF-8");
                        System.out.println(" response content is : " + content);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
                public void failed(final Exception ex) {
                    latch.countDown();
                    System.out.println(request.getRequestLine() + "->" + ex);
                    System.out.println(" callback thread id is : " + Thread.currentThread().getId());
                }
    
                public void cancelled() {
                    latch.countDown();
                    System.out.println(request.getRequestLine() + " cancelled");
                    System.out.println(" callback thread id is : " + Thread.currentThread().getId());
                }
    
            });
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                httpclient.close();
            } catch (IOException ignore) {
    
            }
        }
    }
    复制代码

     呃……代码很简单,好像也没什么好说的了,稍作封装就可以实现如“getJson()”这样的方法。

    也许你还注意到了,这个HttpClient跟同步的版本一样,直接支持https,但如果网站的证书是自签的,默认还是不行的,解决方法当然有,但代码有些麻烦,我觉得还不如直接买张证书来得简单,如果网站是你管的话。

    参考:Java的异步HttpClient

    参考:HttpAsyncClient 的简单使用

  • 相关阅读:
    一个网站需求说明书的示例
    产品设计与PRD介绍
    研发效能度量案例
    项目管理过程流程图
    变量 $cfg['TempDir'] (./tmp/)无法访问。phpMyAdmin无法缓存模板文件,所以会运行缓慢。
    wordpress函数大全列表整理
    PCLZIP_ERR_BAD_FORMAT (-10) : Unable to find End of Central Dir Record signature
    通过写脚本的方式自动获取JVM内的进程堆栈信息等内容
    简单定位占用最高CPU的java进程信息
    使用linux上面powershell安装vm powercli 连接vcenter 通过计划任务自动创建部分虚拟机的快照以及自动清理过期快照的办法
  • 原文地址:https://www.cnblogs.com/aspirant/p/10333915.html
Copyright © 2011-2022 走看看