zoukankan      html  css  js  c++  java
  • CountDownLatch 和 CyclicBarrier

    一、CountDownLatch

      其实要完成这种某个线程等待其他线程结果才能开始任务的业务,直接在需要准备的线程中join()依赖的线程就能完成要求,但是在博客的上一篇《三个线程顺序输出》中也说到过,join的线程返回,必须是子线程已经结束。而CountDownLatch提供了更灵活的方案,可在子线程完成好其他线程依赖的工作后调用countDown()方法主动减少计数,同时继续做线程间业务无依赖的其他工作。

      这里CountDownLatch只提供了一个构造方法,参数为大于0整数:

     public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }

      具体演示如下:

    public class CountDownLatchTest {
    
        private static CountDownLatch counter = new CountDownLatch(2);
    
        public static void main(String[] args) throws InterruptedException {
            Thread threadA = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("子线程:" + Thread.currentThread().getName() + " start working!");
                    try {
                        Thread.sleep(2000);
                        counter.countDown();
                        System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "thread_A");
            Thread threadB = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("子线程:" + Thread.currentThread().getName() + " start working!");
                    try {
                        Thread.sleep(1000);
                        counter.countDown();
                        System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "thread_B");
            threadA.start();
            threadB.start();
            counter.await();
            System.out.println("A、B准备就绪!");
            // do something C...
        }
    }

      结果:

      可以看到在counter.countDown()被调用两次后,主线程便继续执行,同时子线程A、B仍可继续完成各自的工作。需要注意的是,这里初始化的计数值为2,调用2次正好返回,如果初始值为3,而实际只调用2次,那么程序就会在await()处无限等待。因此await()也提供了带等待时间的使用方法:

    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

      例如把测试程序中的await()改成如下:

    counter.await(1000,TimeUnit.MILLISECONDS);

      可以看到结果中,await()在没有等到计数归0,就回到主线程继续执行:

      另外CountDownLatch也提供了getCount()方法随时获取当前剩余计数。

      

    二、CyclicBarrier

      译作同步屏障,提供了2种初始化方法,第一种为简单int参数:

        public CyclicBarrier(int parties) {
            this(parties, null);
        }

      即挂起当前线程,直到所有设定的线程都到达Barrier状态,再一起执行后续任务。

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }

      第二种方法可以设定一个任务,在程序都到达设定的barrier后,再执行barrierAction,截取部分源码如下:

            if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }    

      可以看到在index为0时,如果command不为null,会执行run()方法。注意run()方法并不是start(),也就是并没有再起一个线程,而是在最后到达barrier的线程中继续执行barrierCommand,再返回。

      这里直接演示第二种初始化下的使用,代码如下:

    public class CyclicBarrierTest {
    
        private static int N = 3;
    
        private static CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println("barrierAction:" + Thread.currentThread().getName() + " 开始工作");
                try {
                    Thread.sleep(9000);
                    System.out.println("barrierAction:" + Thread.currentThread().getName() + " 结束工作");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    
        static class Writer extends Thread {
    
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier barr) {
                this.cyclicBarrier = barr;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "开始写入数据到文件...");
                long sleepTime = (long) (Math.random() * 1000 + 1000);
                try {
                    Thread.sleep(sleepTime);
                    System.out
                            .println("线程" + Thread.currentThread().getName() + "用时" + sleepTime + "ms写入数据完毕,等待其他线程完成写入...");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
    
        }
    }

      运行结果:

             

      特意多运行了几次,从结果中可以看出:

        1、在barrier处等待的线程数到达设定值(这里为3)前,当前线程会在cyclicBarrier.await()处挂起;

        2、如果设定了barrierAction任务,当最后到达barrier处的线程在执行await()方法时,会执行barrierAction任务,图中打出了每个任务的执行时间,可以看到每次都是执行任务时间最长,即最后到达barrier处的线程,会继续执行barrierAction方法;

        3、虽然到达了设定的await()线程数,但是必须在最后一个到达的线程执行完barrierAction方法,等await()返回后,所有线程才会继续执行后续的代码,示例中barrierActio()方法故意设定了较长的等待时间,但是程序依然在等待await()返回。

      另外,为了避免await()无休止的等待,JDK同样提供了带等待时间的await方法,观察源码片段:

    if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
    }

      这里要特别注意的时当某个线程抛出TimeoutException异常后会调用breakBarrier()方法把broken值设置为true: 

        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }

      之所以这里提到这个,是因为在dowait()方法中,如果broken为真,函数将直接返回,抛出BrokenBarrierException,造成结果:

        1、如果使用了带barrierCommand方法的构造函数,这里barrierCommand将不会执行;

        2、如果多个线程设置的await()时间不一样,例如A设置的等待时间较短,假设1秒后超时返回,那么B、C即使设置更长的等待时间,他们也不会在await()处等待,而是在运行到await()后立刻抛出BrokenBarrierException。

    /**
         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
    if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

      使用带等待参数的await(),运行上文中的代码,故意使等待超时,结果如下:

      另外CyclicBarrier是可以重用的,这里直接给出@海子博客中的例子:

    public class Test {
        public static void main(String[] args) {
            int N = 2;
            CyclicBarrier barrier = new CyclicBarrier(N);
    
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
    
            try {
                Thread.sleep(25000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("CyclicBarrier重用");
    
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
        }
    
        static class Writer extends Thread {
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
                try {
                    Thread.sleep(5000); // 以睡眠来模拟写入数据操作
                    System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
    
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
            }
        }
    }

      运行效果如图:

      

    三、CountDownLatch 和 CyclicBarrier区别

      1、从使用上可以发现二者侧重点不同,在CountDownLatch使用时其实是存在主线程和子线程的概念,子线程在准备好主线程需要的资源后,主线程结束等待,继续剩下的工作;而在CyclicBarrier使用中,并不存在主和次的说法,更像是一组线程在互相等待后,然后在同一时间,继续后面的操作,可以类比于现实中跑步比赛的场景,一组运动员在各自准备好起跑动作后,由裁判员发令后,统一起跑(额,这里想不到合适的业务场景,尴想了一个强行解释,或许,在优化资源利用时,设置一定的请求数,攒满了后再一起请求?有实际的应用场景欢迎补充)。

      2、等待超时CountDownLatch会直接返回,继续后续工作,CyclicBarrier首先会抛TimeoutException,同时如果barrier要等的线程数大于1,其他线程不会按设定的等待时间等待,而是抛出BrokenBarrierException后直接返回,所以使用CyclicBarrier要注意异常处理的逻辑。

      3、使用细节差异如下表:

          

       4、最后要提的是CountDownLatch 和 CyclicBarrier虽然基本都是在多线程中使用,但是同一个线程多次调用countDown()和await()其实也会使计数改变。

    参考资料:

      1、http://www.importnew.com/21889.html

      2、https://blog.csdn.net/tolcf/article/details/50925145

  • 相关阅读:
    IntelliJ IDEA教程之如何clean或者install Maven项目
    mysql 导出表,导出数据 命令
    import require
    https确实加密了。 抓包是一个中间人攻击过程
    密码学部分算法
    账号密码加密的方案
    查看git提交细节
    使用源安装java JDK
    updated stream stash changes
    Hibernate与Jpa的关系
  • 原文地址:https://www.cnblogs.com/lyInfo/p/9131103.html
Copyright © 2011-2022 走看看