zoukankan      html  css  js  c++  java
  • 高级java必会系列一:常用线程池和调度类

    众所周知,开启线程2种方法:第一是实现Runable接口,第二继承Thread类。(当然内部类也算...)常用的,这里就不再赘述。

    一、线程池

    1.newCachedThreadPool

           (1)缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建立一个新的线程加入池中;

            (2)缓存型池子,通常用于执行一些生存周期很短的异步型任务;因此一些面向连接的daemon型server中用得不多;

            (3)能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。

            (4)注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止

    2.newFixedThreadPool--本人常用

            (1)newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程

            (2)其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子

            (3)和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器

            (4)从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
          fixed池线程数固定,并且是0秒IDLE(无IDLE)
          cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE 

    3.ScheduledThreadPool

            (1)调度型线程池

            (2)这个池子里的线程可以按schedule依次delay执行,或周期执行

    4.SingleThreadExecutor

            (1)单例线程,任意时间池中只能有一个线程

            (2)用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

    二、常用线程调度类

    1.wait、notify、notifyAll-----不建议新手直接使用

    顾名思义,wait是等待,notify是通知一个等待线程、notifyAll唤醒所有等待线程

    2.CountDownLatch----很适合用来将一个任务分为n个独立的部分,等这些部分都完成后继续接下来的任务

    隶属于java.util.concurrent包。CountDownLatch类是一个同步计数器,构造时传入int参数,该参数就是计数器的初始值,每调用一次countDown()方法,计数器减1,计数器大于0 时,await()方法会阻塞程序继续执行.当多个线程达到预期时(latch.countDown()),唤醒多个其他等待中的线程,即执行latch.await()后面的代码。样例是,张三、李四合作完成任务,张三5秒,李四8秒,当张三李四都完成后,总任务结束。代码如下:

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchDemo {  
        final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
        
        public static void main(String[] args) throws InterruptedException {  
            CountDownLatch latch=new CountDownLatch(2);//两个工人的协作  
            Worker worker1=new Worker("张三", 5000, latch);  
            Worker worker2=new Worker("李四", 8000, latch);  
            worker1.start(); 
            worker2.start();  
            latch.await();//阻塞!等待所有工人完成工作  
            System.out.println("all work done at "+sdf.format(new Date()));  
        }  
          
        static class Worker extends Thread{  
            String workerName;   
            int workTime;  
            CountDownLatch latch;  
            public Worker(String workerName ,int workTime ,CountDownLatch latch){  
                 this.workerName=workerName;  
                 this.workTime=workTime;  
                 this.latch=latch;  
            } 
            
            public void run(){  
                System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));  
                doWork();//工作了  
                System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));  
                latch.countDown();//工人完成工作,计数器减一  
    
            }  
              
            private void doWork(){  
                try {  
                    Thread.sleep(workTime);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }

    -----------------------------------------------
    Worker 李四 do work begin at 2016-11-02 18:25:28
    Worker 张三 do work begin at 2016-11-02 18:25:28
    Worker 张三 do work complete at 2016-11-02 18:25:33
    Worker 李四 do work complete at 2016-11-02 18:25:36
    all work done at 2016-11-02 18:25:36

    测试可见,张三李四共同协作完成。

    3.CyclicBarrier----适合多线程循环到达屏障后再执行

     字面意思循环屏障,可理解为栅栏,协同多个线程都执行到barrier.await时,如果构造CyclicBarrier barrier=new CyclicBarrier(2, Runnable)时,第一个参数代码线程数,如果有第二参Runnable,那么所有线程都await时,先执行Runnable,再各自执行await后续的代码。

    CyclicBarrier和CountDownLatch区别

    1.CountDownLatch在多个线程都执行完毕latch.countDown后唤醒await线程,多个countDown子线程在执行完countDown后可继续执行后续代码。

    2.CyclicBarrier可循环使用,CountDownLatch只1次。见代码示例:

    3.CountDownLatch需要latch.countDownlatch.await()配合使用。CyclicBarrier就一个barrier.await。

    下面举例:鸟、鱼2个线程同时运行问题。

      1 package study.thread;
      2 
      3 import java.util.concurrent.BrokenBarrierException;  
      4 import java.util.concurrent.CyclicBarrier;  
      5   
      6 /** 
      7  * 循环栅栏(屏障)
      8  * 问题:一个池塘,有很多鸟和很多鱼,鸟每分钟产生一个后代,鱼每30秒钟产生2个后代。
      9  * 鸟每10秒钟要吃掉一条鱼。建一个池塘,初始化一些鱼和鸟,看看什么时候鸟把鱼吃光。 
     10  * 
     11  */  
     12 public class CyclicBarrierDemo {  
     13   
     14     long time ;  
     15     long birdNum ;  
     16     long fishNum ;  
     17     Object lock = new Object() ;  
     18     CyclicBarrier barrier  ;  
     19       
     20     public CyclicBarrierDemo(long birdNum , long fishNum){  
     21         this.birdNum = birdNum ;  
     22         this.fishNum = fishNum ;  
     23     }  
     24   
     25     /**
     26      * 入口
     27      * @param args
     28      */
     29     public static void main(String[] args) { 
     30         //构造demo,初始化5只秒,20条鱼
     31         CyclicBarrierDemo bf = new CyclicBarrierDemo(5 , 20) ;  
     32         //生态圈开启
     33         bf.start();   
     34     }  
     35   
     36     //生态圈开启
     37     public void start(){  
     38         //构造鱼,鸟,时间线
     39         FishThread fish = new FishThread() ;  
     40         BirdThread bird = new BirdThread() ;  
     41         TimeLine tl = new TimeLine() ;  
     42   
     43         //初始化环形屏障,当barrier对象的await方法被调用两次之后,将会执行tl线程  
     44         barrier = new CyclicBarrier(2, tl) ;//这里要注意第一个参数,如果大于调用await的线程数,会死锁。  
     45   
     46         //鱼、鸟动起来
     47         fish.start();  
     48         bird.start();  
     49   
     50     }  
     51   
     52     public void printInfo(String source){  
     53         System.out.printf(source+"time[%d]:birdNum[%d] ,fishNum[%d]
    " ,time , birdNum , fishNum);  
     54     }  
     55   
     56     private class TimeLine implements Runnable {  
     57         @Override  
     58         public void run() { //所有子任务都调用了await方法后,将会执行该方法, 然后所有子线程继续执行  
     59             System.out.println("TimeLine start!");
     60             //如果鱼数量<=0,结束程序
     61             if(fishNum <= 0){  
     62                 System.exit(-1);     
     63             }
     64             //时间加10秒
     65             time += 10 ; 
     66             System.out.println("TimeLine end,时间加10秒!");
     67         }  
     68     }  
     69   
     70     private class FishThread extends Thread {  
     71         @Override  
     72         public void run() {  
     73             //循环
     74             while(true){  
     75                 try { 
     76                     System.out.println("鱼已经就位!到达await!");
     77                     barrier.await() ;   //进入睡眠, 等待所有子任务都进入睡眠  然后再继续  
     78                 } catch (InterruptedException | BrokenBarrierException e) {  
     79                     e.printStackTrace();  
     80                 }  
     81                 synchronized (lock) {
     82                     //鱼每30秒钟产生2个后代
     83                     if(time % 30 == 0){
     84                         fishNum += fishNum * 2;  
     85                         printInfo("鱼动作执行!");
     86                     }  
     87                 }  
     88             }  
     89         }  
     90     }  
     91   
     92     private class BirdThread extends Thread{  
     93         @Override  
     94         public void run() {
     95             //循环
     96             while(true){  
     97                 try {  
     98                     System.out.println("鸟已经就位!到达await!");
     99                     barrier.await() ;  //进入睡眠, 等待所有子任务都进入睡眠  然后再继续  
    100                 } catch (InterruptedException | BrokenBarrierException e) {  
    101                     e.printStackTrace();  
    102                 }    
    103                 synchronized (lock) {
    104                     //鸟每10秒钟要吃掉一条鱼
    105                     if(time % 10 == 0){  
    106                         fishNum = fishNum >= birdNum ? fishNum - birdNum : 0 ;    
    107                         //鸟每分钟产生一个后代
    108                         if(time % 60 == 0){  
    109                             birdNum += birdNum ;  
    110                         }  
    111                         printInfo("鸟动作执行!");  
    112                     }  
    113                 }  
    114   
    115             }  
    116   
    117         }  
    118   
    119     }  
    120   
    121 }  

    4.Semaphore---通过控制操作系统的信号量数目来控制并发,比控制线程并发数粒度更细。

    管理固定数值的信号量,用以控制并发的数量。把需要并发的代码放在acquirerelease之间即可。acquire获取信号,release释放信号。如果Semaphore管理一个信号量,就是互斥锁。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
    
         public static void main(String[] args) {  
            // 线程池 
            ExecutorService exec = Executors.newCachedThreadPool();  
            // 只能5个线程同时访问 
            final Semaphore semp = new Semaphore(5);  
            // 模拟20个客户端访问 
            for (int index = 0; index < 20; index++) {
                final int NO = index;  
                Runnable run = new Runnable() {  
                    public void run() {  
                        try {  
                            //获取许可 
                            semp.acquire();  
                            System.out.println("Accessing: " + NO);  
                            Thread.sleep(2000);  
                        } catch (InterruptedException e) { 
                            e.printStackTrace();
                        } finally{
                            //释放 
                            semp.release();
                  System.out.println("-----------------"+semp.availablePermits());
    } } }; exec.execute(run); }
    // 退出线程池 exec.shutdown(); } }

    5.Exchanger

    用于两个线程之间进行数据交换,先执行exchanger.exchange()的线程等待后来的线程到达,然后交换数据,最后再继续向下执行。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    /**
     * 
     * @ClassName: ExchangerDemo
     * @Description: 用于两个线程之间进行数据交换,先执行exchanger.exchange()的线程等待后来的线程到达,然后交换数据,最后再继续向下执行。
     * @author denny.zhang
     * @date 2016年11月4日 下午1:27:29
     *
     */
    public class ExchangerDemo {
        public static void main(String[] args) {
            final Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();
            
            new Thread(){
                public void run(){
                    List<Integer> list = new ArrayList<Integer>();
                    list.add(1);
                    list.add(2);
                    try {
                        list = exchanger.exchange(list);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Thread1"+list);
                }
            }.start();
            
            new Thread(){
                public void run(){
                    List<Integer> list = new ArrayList<Integer>();
                    list.add(3);
                    list.add(4);
                    try {
                        list = exchanger.exchange(list);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Thread2"+list);
                }
            }.start();
        }
    }

    6.Future和FutrueTask---常用!

    Future是接口,FutrueTask是接口实现类。场景:多线程并发执行,返回结果放进list.

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * 
     * @ClassName: FutureDemo
     * @Description: Future
     * @author denny.zhang
     * @date 2016年11月4日 下午1:50:32
     *
     */
    public class FutureDemo {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            //结果集
            List<Integer> list = new ArrayList<Integer>();
            //开启多线程
            ExecutorService exs = Executors.newFixedThreadPool(3);
            List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
            //启动线程池,固定线程数为3
            for(int i=0;i<3;i++){
                //提交任务,添加返回
                futureList.add(exs.submit(new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        return 1;
                    }
                }));
            }
            //结果归集
            for (Future<Integer> future : futureList) {
                while (true) {
                    if (future.isDone()&& !future.isCancelled()) {
                        Integer i = future.get();
                        list.add(i);
                        break;
                    } else {
                        Thread.sleep(100);
                    }
                }
            }
            System.out.println("list="+list);
        }
    }

    返回:list=[1, 1, 1]

     ====================================

    参考:

    《大型网站系统与java中间件实践》

  • 相关阅读:
    async源码学习
    js 数组去重
    node通过http.request向其他服务器上传文件
    学习CSS布局
    学习CSS布局
    学习CSS布局
    学习CSS布局
    学习CSS布局
    学习CSS布局
    学习CSS布局
  • 原文地址:https://www.cnblogs.com/dennyzhangdd/p/6024002.html
Copyright © 2011-2022 走看看