zoukankan      html  css  js  c++  java
  • httpClient多线程请求

    使用httpClient可模拟请求Url获取资源,使用单线程的请求速度上会有一定的限制,参考了Apache给出的例子,自己做了测试实现多线程并发请求,以下代码需要HttpClient 4.2的包,可以在http://hc.apache.org/downloads.cgi下载

    1、并发请求

    package generate.httpclient;
    
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.http.HttpEntity;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.HttpClient;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.conn.ClientConnectionManager;
    import org.apache.http.conn.params.ConnManagerParams;
    import org.apache.http.conn.scheme.PlainSocketFactory;
    import org.apache.http.conn.scheme.Scheme;
    import org.apache.http.conn.scheme.SchemeRegistry;
    import org.apache.http.impl.client.DefaultHttpClient;
    import org.apache.http.impl.conn.PoolingClientConnectionManager;
    import org.apache.http.params.BasicHttpParams;
    import org.apache.http.params.HttpConnectionParams;
    import org.apache.http.params.HttpParams;
    import org.apache.http.protocol.BasicHttpContext;
    import org.apache.http.protocol.HttpContext;
    import org.apache.http.util.EntityUtils;
    
    public class ThreadPoolHttpClient {
        // 线程池
        private ExecutorService exe = null;
        // 线程池的容量
        private static final int POOL_SIZE = 20;
        private HttpClient client = null;
        String[] urls=null;
        public ThreadPoolHttpClient(String[] urls){
            this.urls=urls;
        }
        public void test() throws Exception {
            exe = Executors.newFixedThreadPool(POOL_SIZE);
            HttpParams params =new BasicHttpParams();
            /* 从连接池中取连接的超时时间 */ 
            ConnManagerParams.setTimeout(params, 1000);
            /* 连接超时 */ 
            HttpConnectionParams.setConnectionTimeout(params, 2000); 
            /* 请求超时 */
            HttpConnectionParams.setSoTimeout(params, 4000);
            SchemeRegistry schemeRegistry = new SchemeRegistry();
            schemeRegistry.register(
                    new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));
    
            //ClientConnectionManager cm = new PoolingClientConnectionManager(schemeRegistry);
            PoolingClientConnectionManager cm=new PoolingClientConnectionManager(schemeRegistry);
            cm.setMaxTotal(10);
            final HttpClient httpClient = new DefaultHttpClient(cm,params);
    
            // URIs to perform GETs on
            final String[] urisToGet = urls;
            /* 有多少url创建多少线程,url多时机子撑不住
            // create a thread for each URI
            GetThread[] threads = new GetThread[urisToGet.length];
            for (int i = 0; i < threads.length; i++) {
                HttpGet httpget = new HttpGet(urisToGet[i]);
                threads[i] = new GetThread(httpClient, httpget);            
            }
            // start the threads
            for (int j = 0; j < threads.length; j++) {
                threads[j].start();
            }
    
            // join the threads,等待所有请求完成
            for (int j = 0; j < threads.length; j++) {
                threads[j].join();
            }
            使用线程池*/
            for (int i = 0; i < urisToGet.length; i++) {
                final int j=i;
                System.out.println(j);
                HttpGet httpget = new HttpGet(urisToGet[i]);
                exe.execute( new GetThread(httpClient, httpget));
            }
            
            
            //创建线程池,每次调用POOL_SIZE
            /*
            for (int i = 0; i < urisToGet.length; i++) {
                final int j=i;
                System.out.println(j);
                exe.execute(new Thread() {
                    @Override
                    public void run() {
                        this.setName("threadsPoolClient"+j);
                        
                            try {
                                this.sleep(100);
                                System.out.println(j);
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            
                            HttpGet httpget = new HttpGet(urisToGet[j]);
                            new GetThread(httpClient, httpget).get();
                        }
                        
                        
                    
                });
            }
            
            */
            //exe.shutdown();
            System.out.println("Done");
        }
        static class GetThread extends Thread{
            
            private final HttpClient httpClient;
            private final HttpContext context;
            private final HttpGet httpget;
            
            public GetThread(HttpClient httpClient, HttpGet httpget) {
                this.httpClient = httpClient;
                this.context = new BasicHttpContext();
                this.httpget = httpget;
            }
            @Override
            public void run(){
                this.setName("threadsPoolClient");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                get();
            }
            
            public void get() {
                try {
                    HttpResponse response = this.httpClient.execute(this.httpget, this.context);
                    HttpEntity entity = response.getEntity();
                    if (entity != null) {
                        System.out.println(this.httpget.getURI()+": status"+response.getStatusLine().toString());
                    }
                    // ensure the connection gets released to the manager
                    EntityUtils.consume(entity);
                } catch (Exception ex) {
                    this.httpget.abort();
                }finally{
                    httpget.releaseConnection();
                }
            }
        }
    }
    并发请求

    2、多线程异步请求

    package generate.httpclient;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.http.HttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.concurrent.FutureCallback;
    import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
    import org.apache.http.nio.client.HttpAsyncClient;
    import org.apache.http.nio.reactor.IOReactorException;
    
    public class AsynClient{
        /**
         * @param args
         * @throws IOReactorException
         * @throws InterruptedException
         */
        private List<String> urls;
        private HandlerFailThread failHandler;
        public AsynClient(List<String> list){
            failHandler=new HandlerFailThread();
            urls=list;
        }
        public Map<String,String> asynGet() throws IOReactorException,
                InterruptedException {
            final HttpAsyncClient httpclient = new DefaultHttpAsyncClient();
            httpclient.start();
            int urlLength=urls.size();
            HttpGet[] requests = new HttpGet[urlLength];
            int i=0;
            for(String url : urls){
                requests[i]=new HttpGet(url);
                i++;
            }
            final CountDownLatch latch = new CountDownLatch(requests.length);
            final Map<String, String> responseMap=new HashMap<String, String>();
            try {
                for (final HttpGet request : requests) {
                    httpclient.execute(request, new FutureCallback<HttpResponse>() {
    
                        public void completed(final HttpResponse response) {
                            latch.countDown();
                            responseMap.put(request.getURI().toString(), response.getStatusLine().toString());
                            try {
                                System.out.println(request.getRequestLine() + "->"
                                        + response.getStatusLine()+"->");
                                //+readInputStream(response.getEntity().getContent())
                                
                            } catch (IllegalStateException e) {
                                failHandler.putFailUrl(request.getURI().toString(),
                                        response.getStatusLine().toString());
                                e.printStackTrace();
                            } catch (Exception e) {
                                failHandler.putFailUrl(request.getURI().toString(),
                                        response.getStatusLine().toString());
                                e.printStackTrace();
                            }
                        }
    
                        public void failed(final Exception ex) {
                            latch.countDown();
                            ex.printStackTrace();
                            failHandler.putFailUrl(request.getURI().toString(),
                                    ex.getMessage());
                        }
    
                        public void cancelled() {
                            latch.countDown();
                        }
    
                    });
                }
                System.out.println("Doing...");
            } finally {
                latch.await();
                httpclient.shutdown();
            }
            System.out.println("Done");
            failHandler.printFailUrl();
            return responseMap;
        }
        private String readInputStream(InputStream input) throws IOException{
            byte[] buffer = new byte[128];
            int len = 0;
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            while((len = input.read(buffer)) >= 0) {
                bytes.write(buffer, 0, len);
            }
            return bytes.toString();
        }
        /**
         * Test
         * @param args
         */
        public static void main(String[] args) {
            List<String> urls=new ArrayList<String>();
            urls.add("http://127.0.0.1/examples/servlets/");
            urls.add("http://127.0.0.1/examples/servlets/");
            urls.add("http://127.0.0.1/examples/servlets/");
            for(int i=0;i<10;i++){
                urls.addAll(urls);
            }
            System.out.println(urls.size());
            AsynClient client=new AsynClient(urls);
            try {
                client.asynGet();
            } catch (IOReactorException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done");
        }
    }
    多线程异步请求

    3、创建一个线程记录失败的请求

    package generate.httpclient;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HandlerFailThread  extends Thread{
        Map<String, String> failUrl=new HashMap<String, String>();
        public void putFailUrl(String url,String status){
            synchronized (failUrl) {
                failUrl.put(url,status);
            }
        }
        @Override
        public void run() {
            while(true){
                
            }
        }
        public void printFailUrl(){
            for(Map.Entry<String, String> m: failUrl.entrySet()){
                System.out.println("****fail:url:"+m.getKey()+ "  code :"+m.getValue());
            }
        }
    }
    线程记录失败的请求

    异步请求,也可通过pool管理,例如

     ConnectingIOReactor nio=new DefaultConnectingIOReactor();
      PoolingClientAsyncConnectionManager manager=new PoolingClientAsyncConnectionManager(nio);
      manager.setMaxTotal(1000);
      manager.setDefaultMaxPerRoute(100);
      HttpParams params=new BasicHttpParams();
      /* 连接超时 */ 
      HttpConnectionParams.setConnectionTimeout(params, 10000); 
      /* 请求超时 */
      HttpConnectionParams.setSoTimeout(params, 60*1000);
      DefaultHttpAsyncClient.setDefaultHttpParams(params);
      final HttpAsyncClient httpclient = new DefaultHttpAsyncClient(manager);
      httpclient.start();

    HttpClient相关可参看,里面有很多说明与例子

    http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html

     

  • 相关阅读:
    【Scala】看代码,初步了解Apply方法
    【Scala】通过简洁代码搞明白伴生关系、主构造器和辅助构造器的关系
    【Scala】关于集合的各种知识点
    【Scala】新手入门,基础语法概览
    C#中的异步多线程13 回调
    C#中的异步多线程12 轮询
    C#中的异步多线程11 等待直到结束
    C#中的异步多线程10 BackgroundWorker类
    C#中的异步多线程9 完整的GUI示例
    C#中的异步多线程8 Task.Yield
  • 原文地址:https://www.cnblogs.com/Struts-pring/p/5145694.html
Copyright © 2011-2022 走看看