zoukankan      html  css  js  c++  java
  • 异步HttpClient大量请求

    由于项目中有用到HttpClient异步发送大量http请求,所以做已记录

    思路:使用HttpClient连接池,多线程

    public class HttpAsyncClient {
        private static int socketTimeout = 500;// 设置等待数据超时时间0.5秒钟 根据业务调整
    
        private static int connectTimeout = 2000;// 连接超时
    
        private static int poolSize = 100;// 连接池最大连接数
    
        private static int maxPerRoute = 100;// 每个主机的并发最多只有1500
    
        private static int connectionRequestTimeout = 3000; //从连接池中后去连接的timeout时间
    
        // http代理相关参数
        private String host = "";
        private int port = 0;
        private String username = "";
        private String password = "";
    
        // 异步httpclient
        private CloseableHttpAsyncClient asyncHttpClient;
    
        // 异步加代理的httpclient
        private CloseableHttpAsyncClient proxyAsyncHttpClient;
    
        public HttpAsyncClient() {
            try {
                this.asyncHttpClient = createAsyncClient(false);
                this.proxyAsyncHttpClient = createAsyncClient(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        public CloseableHttpAsyncClient createAsyncClient(boolean proxy)
                throws KeyManagementException, UnrecoverableKeyException,
                NoSuchAlgorithmException, KeyStoreException,
                MalformedChallengeException, IOReactorException {
    
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectionRequestTimeout(connectionRequestTimeout)
                    .setConnectTimeout(connectTimeout)
                    .setSocketTimeout(socketTimeout).build();
    
            SSLContext sslcontext = SSLContexts.createDefault();
    
            UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(
                    username, password);
    
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, credentials);
    
            // 设置协议http和https对应的处理socket链接工厂的对象
            Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
                    .<SchemeIOSessionStrategy> create()
                    .register("http", NoopIOSessionStrategy.INSTANCE)
                    .register("https", new SSLIOSessionStrategy(sslcontext))
                    .build();
    
            // 配置io线程
            IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setSoKeepAlive(false).setTcpNoDelay(true)
                    .setIoThreadCount(Runtime.getRuntime().availableProcessors())
                    .build();
            // 设置连接池大小
            ConnectingIOReactor ioReactor;
            ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
            PoolingNHttpClientConnectionManager conMgr = new PoolingNHttpClientConnectionManager(
                    ioReactor, null, sessionStrategyRegistry, null);
    
            if (poolSize > 0) {
                conMgr.setMaxTotal(poolSize);
            }
    
            if (maxPerRoute > 0) {
                conMgr.setDefaultMaxPerRoute(maxPerRoute);
            } else {
                conMgr.setDefaultMaxPerRoute(10);
            }
    
            ConnectionConfig connectionConfig = ConnectionConfig.custom()
                    .setMalformedInputAction(CodingErrorAction.IGNORE)
                    .setUnmappableInputAction(CodingErrorAction.IGNORE)
                    .setCharset(Consts.UTF_8).build();
    
            Lookup<AuthSchemeProvider> authSchemeRegistry;
            authSchemeRegistry = RegistryBuilder
                    .<AuthSchemeProvider> create()
                    .register(AuthSchemes.BASIC, new BasicSchemeFactory())
                    .register(AuthSchemes.DIGEST, new DigestSchemeFactory())
                    .register(AuthSchemes.NTLM, new NTLMSchemeFactory())
                    .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
                    .register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
                    .build();
            conMgr.setDefaultConnectionConfig(connectionConfig);
    
            if (proxy) {
                return HttpAsyncClients.custom().setConnectionManager(conMgr)
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setDefaultAuthSchemeRegistry(authSchemeRegistry)
                        .setProxy(new HttpHost(host, port))
                        .setDefaultCookieStore(new BasicCookieStore())
                        .setDefaultRequestConfig(requestConfig).build();
            } else {
                return HttpAsyncClients.custom().setConnectionManager(conMgr)
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setDefaultAuthSchemeRegistry(authSchemeRegistry)
                        .setDefaultCookieStore(new BasicCookieStore()).build();
            }
    
        }
    
        public CloseableHttpAsyncClient getAsyncHttpClient() {
            return asyncHttpClient;
        }
    
        public CloseableHttpAsyncClient getProxyAsyncHttpClient() {
            return proxyAsyncHttpClient;
        }
    }
    public class HttpClientFactory {
        private static HttpAsyncClient httpAsyncClient = new HttpAsyncClient();
    
        private HttpClientFactory() {
        }
    
        private static HttpClientFactory httpClientFactory = new HttpClientFactory();
    
        public static HttpClientFactory getInstance() {
    
            return httpClientFactory;
    
        }
    
        public HttpAsyncClient getHttpAsyncClientPool() {
            return httpAsyncClient;
        }
    
    }
    public void sendThredPost(List<FaceBookUserQuitEntity> list,String title,String subTitle,String imgUrl){
           if(list == null || list.size() == 0){
               new BusinessException("亚洲查询用户数据为空");
           }
           int number = list.size();
           int num = number / 10;
           PostThread[] threads = new PostThread[1];
           if(num > 0){
               threads = new PostThread[10];
               for(int i = 0; i <= 9; i++) {
                   List<FaceBookUserQuitEntity> threadList = list.subList(i * num, (i + 1) * num > number ? number : (i + 1) * num);
                   if (threadList == null || threadList.size() == 0) {
                       new BusinessException("亚洲切分用户数据为空");
                   }
                   threads[i] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(),
                           threadList, title, subTitle, imgUrl);
               }
                   for (int k = 0; k< threads.length; k++) {
                       threads[k].start();
                       logger.info("亚洲线程: {} 启动",k);
                   }
                   for (int j = 0; j < threads.length; j++) {
                       try {
                           threads[j].join();
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                   }
           }else{
               threads[0] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(),
                       list,title,subTitle, imgUrl);
               threads[0].start();
               try {
                   threads[0].join();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
     public PostThread(CloseableHttpAsyncClient httpClient, List<FaceBookUserQuitEntity> list, String title, String subTitle,String imgUrl){
            this.httpClient = httpClient;
            this.list = list;
            this. title= title;
            this. subTitle= subTitle;
            this. imgUrl= imgUrl;
        }
        @Override
        public void run() {
            try {
                int size = list.size();
                for (int k = 0; k < size; k += 100) {
                    List<FaceBookUserQuitEntity> subList = new ArrayList<FaceBookUserQuitEntity>();
                    if (k + 100 < size) {
                        subList = list.subList(k, k + 100);
                    } else {
                        subList = list.subList(k, size);
                    }
                    if(subList.size() > 0){
                        httpClient.start();
                        final long startTime = System.currentTimeMillis();
                        final CountDownLatch latch = new CountDownLatch(subList.size());
                        for (FaceBookUserQuitEntity faceBookEntity :  subList) {
                            String senderId = faceBookEntity.getSenderId();
                            String player_id = faceBookEntity.getPlayer_id();
                            logger.info("开始发送消息:playerid=" + player_id);
                            String bodyStr = getPostbody(senderId, player_id, title, subTitle,
                                    imgUrl, "Play Game", "");
                            if (!bodyStr.isEmpty()) {
                                final HttpPost httpPost = new HttpPost(URL);
                                StringEntity stringEntity = new StringEntity(bodyStr, "utf-8");
                                stringEntity.setContentEncoding("UTF-8");
                                stringEntity.setContentType("application/json");
                                httpPost.setEntity(stringEntity);
                                httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
                                    @Override
                                    public void completed(HttpResponse result) {
                                        latch.countDown();
                                        int statusCode = result.getStatusLine().getStatusCode();
                                        if(200 == statusCode){
                                            logger.info("请求发消息成功="+bodyStr);
                                            try {
                                                logger.info(EntityUtils.toString(result.getEntity(), "UTF-8"));
                                            } catch (IOException e) {
                                                e.printStackTrace();
                                            }
                                        }else{
                                            logger.info("请求返回状态="+statusCode);
                                            logger.info("请求发消息失败="+bodyStr);
                                            try {
                                                logger.info(EntityUtils.toString(result.getEntity(), "UTF-8"));
                                            } catch (IOException e) {
                                                e.printStackTrace();
                                            }
                                        }
                                    }
    
                                    @Override
                                    public void failed(Exception ex) {
                                        latch.countDown();
                                        logger.info("请求发消息失败e="+ex);
                                    }
    
                                    @Override
                                    public void cancelled() {
                                        latch.countDown();
                                    }
                                });
                            }
                        }
                        try {
                            latch.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        long leftTime = 10000 - (System.currentTimeMillis() - startTime);
                        if (leftTime > 0) {
                            try {
                                Thread.sleep(leftTime);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            } catch (UnsupportedCharsetException e) {
                e.printStackTrace();
            }
        }

    以上工具代码可直接使用,发送逻辑代码需适当修改。



  • 相关阅读:
    83. Remove Duplicates from Sorted List
    35. Search Insert Position
    96. Unique Binary Search Trees
    94. Binary Tree Inorder Traversal
    117. Populating Next Right Pointers in Each Node II
    116. Populating Next Right Pointers in Each Node
    111. Minimum Depth of Binary Tree
    169. Majority Element
    171. Excel Sheet Column Number
    190. Reverse Bits
  • 原文地址:https://www.cnblogs.com/xiaoheis/p/9670724.html
Copyright © 2011-2022 走看看