zoukankan      html  css  js  c++  java
  • httpClient多线程请求

    使用httpClient可模拟请求Url获取资源,使用单线程的请求速度上会有一定的限制,参考了Apache给出的例子,自己做了测试实现多线程并发请求,以下代码需要HttpClient 4.2的包,可以在http://hc.apache.org/downloads.cgi下载

    1、并发请求

    package generate.httpclient;
    
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.http.HttpEntity;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.HttpClient;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.conn.ClientConnectionManager;
    import org.apache.http.conn.params.ConnManagerParams;
    import org.apache.http.conn.scheme.PlainSocketFactory;
    import org.apache.http.conn.scheme.Scheme;
    import org.apache.http.conn.scheme.SchemeRegistry;
    import org.apache.http.impl.client.DefaultHttpClient;
    import org.apache.http.impl.conn.PoolingClientConnectionManager;
    import org.apache.http.params.BasicHttpParams;
    import org.apache.http.params.HttpConnectionParams;
    import org.apache.http.params.HttpParams;
    import org.apache.http.protocol.BasicHttpContext;
    import org.apache.http.protocol.HttpContext;
    import org.apache.http.util.EntityUtils;
    
    public class ThreadPoolHttpClient {
        // 线程池
        private ExecutorService exe = null;
        // 线程池的容量
        private static final int POOL_SIZE = 20;
        private HttpClient client = null;
        String[] urls=null;
        public ThreadPoolHttpClient(String[] urls){
            this.urls=urls;
        }
        public void test() throws Exception {
            exe = Executors.newFixedThreadPool(POOL_SIZE);
            HttpParams params =new BasicHttpParams();
            /* 从连接池中取连接的超时时间 */ 
            ConnManagerParams.setTimeout(params, 1000);
            /* 连接超时 */ 
            HttpConnectionParams.setConnectionTimeout(params, 2000); 
            /* 请求超时 */
            HttpConnectionParams.setSoTimeout(params, 4000);
            SchemeRegistry schemeRegistry = new SchemeRegistry();
            schemeRegistry.register(
                    new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));
    
            //ClientConnectionManager cm = new PoolingClientConnectionManager(schemeRegistry);
            PoolingClientConnectionManager cm=new PoolingClientConnectionManager(schemeRegistry);
            cm.setMaxTotal(10);
            final HttpClient httpClient = new DefaultHttpClient(cm,params);
    
            // URIs to perform GETs on
            final String[] urisToGet = urls;
            /* 有多少url创建多少线程,url多时机子撑不住
            // create a thread for each URI
            GetThread[] threads = new GetThread[urisToGet.length];
            for (int i = 0; i < threads.length; i++) {
                HttpGet httpget = new HttpGet(urisToGet[i]);
                threads[i] = new GetThread(httpClient, httpget);            
            }
            // start the threads
            for (int j = 0; j < threads.length; j++) {
                threads[j].start();
            }
    
            // join the threads,等待所有请求完成
            for (int j = 0; j < threads.length; j++) {
                threads[j].join();
            }
            使用线程池*/
            for (int i = 0; i < urisToGet.length; i++) {
                final int j=i;
                System.out.println(j);
                HttpGet httpget = new HttpGet(urisToGet[i]);
                exe.execute( new GetThread(httpClient, httpget));
            }
            
            
            //创建线程池,每次调用POOL_SIZE
            /*
            for (int i = 0; i < urisToGet.length; i++) {
                final int j=i;
                System.out.println(j);
                exe.execute(new Thread() {
                    @Override
                    public void run() {
                        this.setName("threadsPoolClient"+j);
                        
                            try {
                                this.sleep(100);
                                System.out.println(j);
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            
                            HttpGet httpget = new HttpGet(urisToGet[j]);
                            new GetThread(httpClient, httpget).get();
                        }
                        
                        
                    
                });
            }
            
            */
            //exe.shutdown();
            System.out.println("Done");
        }
        static class GetThread extends Thread{
            
            private final HttpClient httpClient;
            private final HttpContext context;
            private final HttpGet httpget;
            
            public GetThread(HttpClient httpClient, HttpGet httpget) {
                this.httpClient = httpClient;
                this.context = new BasicHttpContext();
                this.httpget = httpget;
            }
            @Override
            public void run(){
                this.setName("threadsPoolClient");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                get();
            }
            
            public void get() {
                try {
                    HttpResponse response = this.httpClient.execute(this.httpget, this.context);
                    HttpEntity entity = response.getEntity();
                    if (entity != null) {
                        System.out.println(this.httpget.getURI()+": status"+response.getStatusLine().toString());
                    }
                    // ensure the connection gets released to the manager
                    EntityUtils.consume(entity);
                } catch (Exception ex) {
                    this.httpget.abort();
                }finally{
                    httpget.releaseConnection();
                }
            }
        }
    }
    并发请求

    2、多线程异步请求

    package generate.httpclient;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.http.HttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.concurrent.FutureCallback;
    import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
    import org.apache.http.nio.client.HttpAsyncClient;
    import org.apache.http.nio.reactor.IOReactorException;
    
    public class AsynClient{
        /**
         * @param args
         * @throws IOReactorException
         * @throws InterruptedException
         */
        private List<String> urls;
        private HandlerFailThread failHandler;
        public AsynClient(List<String> list){
            failHandler=new HandlerFailThread();
            urls=list;
        }
        public Map<String,String> asynGet() throws IOReactorException,
                InterruptedException {
            final HttpAsyncClient httpclient = new DefaultHttpAsyncClient();
            httpclient.start();
            int urlLength=urls.size();
            HttpGet[] requests = new HttpGet[urlLength];
            int i=0;
            for(String url : urls){
                requests[i]=new HttpGet(url);
                i++;
            }
            final CountDownLatch latch = new CountDownLatch(requests.length);
            final Map<String, String> responseMap=new HashMap<String, String>();
            try {
                for (final HttpGet request : requests) {
                    httpclient.execute(request, new FutureCallback<HttpResponse>() {
    
                        public void completed(final HttpResponse response) {
                            latch.countDown();
                            responseMap.put(request.getURI().toString(), response.getStatusLine().toString());
                            try {
                                System.out.println(request.getRequestLine() + "->"
                                        + response.getStatusLine()+"->");
                                //+readInputStream(response.getEntity().getContent())
                                
                            } catch (IllegalStateException e) {
                                failHandler.putFailUrl(request.getURI().toString(),
                                        response.getStatusLine().toString());
                                e.printStackTrace();
                            } catch (Exception e) {
                                failHandler.putFailUrl(request.getURI().toString(),
                                        response.getStatusLine().toString());
                                e.printStackTrace();
                            }
                        }
    
                        public void failed(final Exception ex) {
                            latch.countDown();
                            ex.printStackTrace();
                            failHandler.putFailUrl(request.getURI().toString(),
                                    ex.getMessage());
                        }
    
                        public void cancelled() {
                            latch.countDown();
                        }
    
                    });
                }
                System.out.println("Doing...");
            } finally {
                latch.await();
                httpclient.shutdown();
            }
            System.out.println("Done");
            failHandler.printFailUrl();
            return responseMap;
        }
        private String readInputStream(InputStream input) throws IOException{
            byte[] buffer = new byte[128];
            int len = 0;
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            while((len = input.read(buffer)) >= 0) {
                bytes.write(buffer, 0, len);
            }
            return bytes.toString();
        }
        /**
         * Test
         * @param args
         */
        public static void main(String[] args) {
            List<String> urls=new ArrayList<String>();
            urls.add("http://127.0.0.1/examples/servlets/");
            urls.add("http://127.0.0.1/examples/servlets/");
            urls.add("http://127.0.0.1/examples/servlets/");
            for(int i=0;i<10;i++){
                urls.addAll(urls);
            }
            System.out.println(urls.size());
            AsynClient client=new AsynClient(urls);
            try {
                client.asynGet();
            } catch (IOReactorException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done");
        }
    }
    多线程异步请求

    3、创建一个线程记录失败的请求

    package generate.httpclient;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HandlerFailThread  extends Thread{
        Map<String, String> failUrl=new HashMap<String, String>();
        public void putFailUrl(String url,String status){
            synchronized (failUrl) {
                failUrl.put(url,status);
            }
        }
        @Override
        public void run() {
            while(true){
                
            }
        }
        public void printFailUrl(){
            for(Map.Entry<String, String> m: failUrl.entrySet()){
                System.out.println("****fail:url:"+m.getKey()+ "  code :"+m.getValue());
            }
        }
    }
    线程记录失败的请求

    异步请求,也可通过pool管理,例如

     ConnectingIOReactor nio=new DefaultConnectingIOReactor();
      PoolingClientAsyncConnectionManager manager=new PoolingClientAsyncConnectionManager(nio);
      manager.setMaxTotal(1000);
      manager.setDefaultMaxPerRoute(100);
      HttpParams params=new BasicHttpParams();
      /* 连接超时 */ 
      HttpConnectionParams.setConnectionTimeout(params, 10000); 
      /* 请求超时 */
      HttpConnectionParams.setSoTimeout(params, 60*1000);
      DefaultHttpAsyncClient.setDefaultHttpParams(params);
      final HttpAsyncClient httpclient = new DefaultHttpAsyncClient(manager);
      httpclient.start();

    HttpClient相关可参看,里面有很多说明与例子

    http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html

     

  • 相关阅读:
    bzoj3505 数三角形 组合计数
    cogs2057 殉国 扩展欧几里得
    cogs333 荒岛野人 扩展欧几里得
    bzoj1123 BLO tarjan求点双连通分量
    poj3352 road construction tarjan求双连通分量
    cogs1804 联合权值 dp
    cogs2478 简单的最近公共祖先 树形dp
    cogs1493 递推关系 矩阵
    cogs2557 天天爱跑步 LCA
    hdu4738 Caocao's Bridge Tarjan求割边
  • 原文地址:https://www.cnblogs.com/Struts-pring/p/5145694.html
Copyright © 2011-2022 走看看