zoukankan      html  css  js  c++  java
  • CountDownLatch、CyclicBarrier和Semaphore

    一.CountDownLatch用法

    CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

    CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

    当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,你只需要把这个CountDownLatch的引用传递到线程里。

    其他方法

    如果有某个解析sheet的线程处理的比较慢,我们不可能让主线程一直等待,所以我们可以使用另外一个带指定时间的await方法,await(long time, TimeUnit unit): 这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。

    注意:计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。

    public class CountDownLatchTest {
        private AtomicInteger atomicInteger = new AtomicInteger();
    
        @Test
        public void cuntDownLatchTest() throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(3);
            ExecutorService executorService = Executors.newFixedThreadPool(3);
    
            executorService.execute(() -> {
                atomicInteger.addAndGet(1000);
                countDownLatch.countDown();
            });
            executorService.execute(() -> {
                atomicInteger.addAndGet(1000);
                countDownLatch.countDown();
            });
            executorService.execute(() -> {
                atomicInteger.addAndGet(1000);
                countDownLatch.countDown();
            });
            System.out.println("等待子线程执行");
            countDownLatch.await();
            System.out.println("子线程执行完毕");
            System.out.println(atomicInteger);
    
        }
    }
    

     执行结果

    等待子线程执行
    子线程执行完毕
    3000
    

    二.CyclicBarrier用法

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

    public class CyclicBarrierTest {
        private static final int THREAD_NUMBER = 3;
        private static CyclicBarrier sCyclicBarrier = new CyclicBarrier(
                THREAD_NUMBER, new Runnable() {
            @Override
            public void run() {
                System.out.println("大家都到达了宿舍楼下,一起出发吧。。。");
            }
        });
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
            for (int i = 0; i < THREAD_NUMBER; i++) {
                executorService.execute(new WalkFromDomitoryToCanteenRunnable(sCyclicBarrier, "同学" + i));
            }
            try {
                Thread.sleep(10000);//主线程睡眠
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("CyclicBarrier重用");
            for (int i = THREAD_NUMBER; i < THREAD_NUMBER * 2; i++) {
                executorService.execute(new WalkFromDomitoryToCanteenRunnable(sCyclicBarrier, "同学" + i));
            }
        }
    
        /**
         * 从宿舍到食堂线程
         *
         * @author LiuYi
         */
        public static class WalkFromDomitoryToCanteenRunnable implements Runnable {
            private CyclicBarrier mCyclicBarrier;
            private String mName;
    
            public WalkFromDomitoryToCanteenRunnable(CyclicBarrier cyclicBarrier,
                                                     String name) {
                this.mCyclicBarrier = cyclicBarrier;
                this.mName = name;
            }
    
            @Override
            public void run() {
                System.out.println(mName + "开始从宿舍出发。。。");
                try {
                    Thread.sleep(1000);
                    mCyclicBarrier.await();// 等待别同学
                    // 前往食堂
                    System.out.println(mName + "开始从宿舍楼下出发。。。");
                    Thread.sleep(1000);
                    System.out.println(mName + "达到食堂。。。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

     执行结果

    同学0开始从宿舍出发。。。
    同学2开始从宿舍出发。。。
    同学1开始从宿舍出发。。。
    大家都到达了宿舍楼下,一起出发吧。。。
    同学0开始从宿舍楼下出发。。。
    同学2开始从宿舍楼下出发。。。
    同学1开始从宿舍楼下出发。。。
    同学0达到食堂。。。
    同学2达到食堂。。。
    同学1达到食堂。。。
    CyclicBarrier重用
    同学3开始从宿舍出发。。。
    同学4开始从宿舍出发。。。
    同学5开始从宿舍出发。。。
    大家都到达了宿舍楼下,一起出发吧。。。
    同学3开始从宿舍楼下出发。。。
    同学4开始从宿舍楼下出发。。。
    同学5开始从宿舍楼下出发。。。
    同学4达到食堂。。。
    同学5达到食堂。。。
    同学3达到食堂。。。
    

    CyclicBarrier的应用场景

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

    CyclicBarrier和CountDownLatch的区别

    • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

    三.Semaphore用法

      Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

      Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

    public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
    }
    

     假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:

    public class Test {
        public static void main(String[] args) {
            int N = 8;            //工人数
            Semaphore semaphore = new Semaphore(5); //机器数目
            for(int i=0;i<N;i++)
                new Worker(i,semaphore).start();
        }
         
        static class Worker extends Thread{
            private int num;
            private Semaphore semaphore;
            public Worker(int num,Semaphore semaphore){
                this.num = num;
                this.semaphore = semaphore;
            }
             
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println("工人"+this.num+"占用一个机器在生产...");
                    Thread.sleep(2000);
                    System.out.println("工人"+this.num+"释放出机器");
                    semaphore.release();           
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

     执行结果

    工人0占用一个机器在生产...
    工人1占用一个机器在生产...
    工人2占用一个机器在生产...
    工人4占用一个机器在生产...
    工人5占用一个机器在生产...
    工人0释放出机器
    工人2释放出机器
    工人3占用一个机器在生产...
    工人7占用一个机器在生产...
    工人4释放出机器
    工人5释放出机器
    工人1释放出机器
    工人6占用一个机器在生产...
    工人3释放出机器
    工人7释放出机器
    工人6释放出机器
    
  • 相关阅读:
    sql中内连接和外连接的区别
    javascript优化--08模式(代码复用)01
    javascript优化--07模式(对象)02
    javascript优化--06模式(对象)01
    HTML-Audio/Video
    javascript优化--05模式(函数)
    javascript优化--04高质量编码
    javascript优化--03高质量编码
    javascript优化--02高质量编码
    Unicode编码
  • 原文地址:https://www.cnblogs.com/Genesisx/p/9367730.html
Copyright © 2011-2022 走看看