zoukankan      html  css  js  c++  java
  • 并发处理-线程池

    之前写过一个HTTP Client 请求,刷新主机缓存,之前实现通过为前台获取主机地址 通过 for循环进行调用,通过测试之后就没在理,现在发现性能不足,

    遇到timeout情况会产生严重延迟效果,无法使用,现在将其改造成并发处理。

    之前有学习过并发,只是简单的了解线程,线程状态,线程安全等基本知识,联系过抢票等一下简单实例,具体开发没用到过,之前开发业务逻辑,也不需要,

    简单的逻辑即可,突然进行并发处理有点懵,话不多说,直接贴代码。下面详细解释。

    // HTTPClient     
    public static  String doGet(String url){
            String result = "";
            BufferedReader in = null;
            try {
                URL realUrl = new URL(url);
                // 打开和URL之间的连接
                HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
                // 设置通用的请求属性
                connection.setRequestProperty("accept", "*/*");
                connection.setRequestProperty("connection", "Keep-Alive");
                connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
                //设置超时
                System.setProperty("sun.net.client.defaultConnectTimeout", "5000");
                System.setProperty("sun.net.client.defaultReadTime", "5000");
                connection.setConnectTimeout(5000);
                connection.setReadTimeout(5000);
                // 建立实际的连接
                connection.connect();
                // 获取所有响应头字段
                Map<String, List<String>> map = connection.getHeaderFields();
                // 遍历所有的响应头字段
                for (String key : map.keySet()) {
                    log.debug(key + "--->" + map.get(key));
                }
                // 定义 BufferedReader输入流来读取URL的响应
                in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
                String line;
                while ((line = in.readLine()) != null) {
                    result += line;
                }
            } catch (MalformedURLException e) {
                return "FAILD";
            } catch (IOException e) {
                return "FAILD";
            }finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            return result;
        }
    

      

    HTTP请求是无状态的请求,所以很容易造成超时处理,必须设定合适的相应控制范围,由于公司主机网络延迟比较严重

    设置5秒延迟时间

                System.setProperty("sun.net.client.defaultConnectTimeout", "5000"); //JDK 1.5之后推荐这么写,JVM层

                System.setProperty("sun.net.client.defaultReadTime", "5000");

                connection.setConnectTimeout(5000);      //JDK 1.5之前可以这么设置 链接层面

                connection.setReadTimeout(5000);

    关于超时的设置,网上找到两种方法,我都引用了,万无一失,一个是JVM层面,一个是链接层面

    //线程并发处理 
    // 内部类
    class HTTPThread implements Callable<IData> {
        private IData param  = null;
        private StringBuffer url = null;
        private CountDownLatch countDownLatch;
    
        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
    
        public IData getParam() {
            return param;
        }
    
        public void setParam(IData instance) {
            this.param = instance;
            url = new StringBuffer("http://");
            url.append(instance.getString("HOST")+":");
            url.append(instance.getString("PORT"));
            url.append(instance.getString("CONTEXT",""));
            url.append(instance.getString("SERVLET"));
            url.append("?WadeSafeUpdate");
        }
    
        @Override
        public IData call() throws Exception {
            try {
                String state = HTTPClientAPI.doGet(url.toString());
                this.param.put("RESULT", state);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (countDownLatch != null) {
                    //递减锁存器的计数
                    countDownLatch.countDown();
                }
            }
            return this.param;
        }
    }
    

      

    // 调用方法
    /**
         * 刷新所有机器
         * @param param
         * @return
         * @throws Exception
         */
        public static IDataset refreshALLListener(IData param) throws Exception {
            int success = 0;
            int faild = 0;
            IDataset instances = queryListener(param);
            ArrayList<Future<IData>> results = new ArrayList<Future<IData>>();//
            ExecutorService executorService =Executors.newFixedThreadPool(10);
            CountDownLatch countDownLatch = new CountDownLatch(instances.size());
            for(int i=0; i<instances.size() ; i++){
                IData instance = instances.getData(i);
                HTTPThread httpThread = new HTTPThread();
                httpThread.setParam(instance);
                httpThread.setCountDownLatch(countDownLatch);
                results.add(executorService.submit(httpThread));
            }
            countDownLatch.await();
            executorService.shutdown();
            IDataset result = new DatasetList();
            for(int i=0; i<results.size() ; i++){
                result.add(results.get(i).get());
                IData res = result.getData(i);
                String state = res.getString("RESULT");
                if("SUCCESS".equals(state)){
                    success++;
                    res.put("FRESHSTATE","刷新成功");
                    res.put("REFRESHSTATE",state);
    
                    SQLParser parser=new SQLParser(res);
                    parser.addSQL(" update vest_server_instance set UPTIME = sysdate() where HOST = :HOST ");
                    parser.addSQL(" AND PORT = :PORT ");
                    parser.addSQL(" AND  CONTEXT = :CONTEXT ");
                    parser.addSQL(" AND SERVLET = :SERVLET");
                    BaseDAO dao = new BaseDAO();
                    dao.initial("base");
                    dao.executeUpdate(parser);
                    res.put("UPTIME", TIME_FORMAT.format(new Date()));
                }else{
                    faild++;
                    res.put("FRESHSTATE","刷新失败");
                    res.put("REFRESHSTATE",state);
                }
            }
            if(instances.size() > 0){
                instances.getData(0).put("SUCCESS",success);
                instances.getData(0).put("FAILD",faild);
            }
            return result;
        }
    
    }
    

      

    1:线程池

    在之前 没接触过线程池,开启线程都是通过Thread.run方法开启。但是随着new 的Thread 越来越多,创建-销毁,创建-销毁,就会造成资源的严重浪费,效率会下降,快速崩溃。线程池就可以帮助我们解决这个问题,他使线程可以重复使用,就是执行完一个任务线程不会被销毁,而是可以继续执行其他任务

    2:线程执行先后、如何同步、处理完成

    CountDownLatch 线程同步工具类,相当于一个计数器,通过初始化设置一个数量(只能设置一次,不能更改)。当线程完成任务后,会进行减一。

    3线程如何执行完成如何返回数据

    之前都是通过Runnable 接口来创建一个线程,但是run方法没有返回值,所以应该使用Callable,Callable的call方法可以根据你传入的泛型参数返回对应类型的数据。callable的call方法返回的数据是通过Future来接受的,有两个方法鼻祖知道,首先,可以用isDone()方法来查询Future是否已经完成,任务完成后,可以调用get()方法来获取结果 如果不加判断直接调用get方法,此时如果线程未完成,get将阻塞,直至结果准备就绪

    线程池:

    JAVA通过Executors创建线程池

    1.  newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    2.  newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    3.  newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    4.  newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    线程池重要方法:

    1. execute()实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
    2. submit()是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
    3. shutdown() 关闭线程池
    4. shutdownNow() 关闭线程池

     

  • 相关阅读:
    AFNetworking 3.0中调用[AFHTTPSessionManager manager]方法导致内存泄漏的解决办法
    UITableView自动计算cell高度并缓存
    iOS 10 应用内跳转到系统设置
    iOS 系统通知
    UITableViewDataSource TableView數據源協議
    HADOOP操作权限问题
    Echarts简单图表
    hadoop常见错误解决方法
    sqoop安装与简单实用
    hive的内置函数和自定义函数
  • 原文地址:https://www.cnblogs.com/Tonyzczc/p/10737047.html
Copyright © 2011-2022 走看看