zoukankan      html  css  js  c++  java
  • CountDownLatch 和 CyclicBarrier

    一、CountDownLatch

      其实要完成这种某个线程等待其他线程结果才能开始任务的业务,直接在需要准备的线程中join()依赖的线程就能完成要求,但是在博客的上一篇《三个线程顺序输出》中也说到过,join的线程返回,必须是子线程已经结束。而CountDownLatch提供了更灵活的方案,可在子线程完成好其他线程依赖的工作后调用countDown()方法主动减少计数,同时继续做线程间业务无依赖的其他工作。

      这里CountDownLatch只提供了一个构造方法,参数为大于0整数:

     public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }

      具体演示如下:

    public class CountDownLatchTest {
    
        private static CountDownLatch counter = new CountDownLatch(2);
    
        public static void main(String[] args) throws InterruptedException {
            Thread threadA = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("子线程:" + Thread.currentThread().getName() + " start working!");
                    try {
                        Thread.sleep(2000);
                        counter.countDown();
                        System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "thread_A");
            Thread threadB = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("子线程:" + Thread.currentThread().getName() + " start working!");
                    try {
                        Thread.sleep(1000);
                        counter.countDown();
                        System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "thread_B");
            threadA.start();
            threadB.start();
            counter.await();
            System.out.println("A、B准备就绪!");
            // do something C...
        }
    }

      结果:

      可以看到在counter.countDown()被调用两次后,主线程便继续执行,同时子线程A、B仍可继续完成各自的工作。需要注意的是,这里初始化的计数值为2,调用2次正好返回,如果初始值为3,而实际只调用2次,那么程序就会在await()处无限等待。因此await()也提供了带等待时间的使用方法:

    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

      例如把测试程序中的await()改成如下:

    counter.await(1000,TimeUnit.MILLISECONDS);

      可以看到结果中,await()在没有等到计数归0,就回到主线程继续执行:

      另外CountDownLatch也提供了getCount()方法随时获取当前剩余计数。

      

    二、CyclicBarrier

      译作同步屏障,提供了2种初始化方法,第一种为简单int参数:

        public CyclicBarrier(int parties) {
            this(parties, null);
        }

      即挂起当前线程,直到所有设定的线程都到达Barrier状态,再一起执行后续任务。

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }

      第二种方法可以设定一个任务,在程序都到达设定的barrier后,再执行barrierAction,截取部分源码如下:

            if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }    

      可以看到在index为0时,如果command不为null,会执行run()方法。注意run()方法并不是start(),也就是并没有再起一个线程,而是在最后到达barrier的线程中继续执行barrierCommand,再返回。

      这里直接演示第二种初始化下的使用,代码如下:

    public class CyclicBarrierTest {
    
        private static int N = 3;
    
        private static CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println("barrierAction:" + Thread.currentThread().getName() + " 开始工作");
                try {
                    Thread.sleep(9000);
                    System.out.println("barrierAction:" + Thread.currentThread().getName() + " 结束工作");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    
        static class Writer extends Thread {
    
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier barr) {
                this.cyclicBarrier = barr;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "开始写入数据到文件...");
                long sleepTime = (long) (Math.random() * 1000 + 1000);
                try {
                    Thread.sleep(sleepTime);
                    System.out
                            .println("线程" + Thread.currentThread().getName() + "用时" + sleepTime + "ms写入数据完毕,等待其他线程完成写入...");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
    
        }
    }

      运行结果:

             

      特意多运行了几次,从结果中可以看出:

        1、在barrier处等待的线程数到达设定值(这里为3)前,当前线程会在cyclicBarrier.await()处挂起;

        2、如果设定了barrierAction任务,当最后到达barrier处的线程在执行await()方法时,会执行barrierAction任务,图中打出了每个任务的执行时间,可以看到每次都是执行任务时间最长,即最后到达barrier处的线程,会继续执行barrierAction方法;

        3、虽然到达了设定的await()线程数,但是必须在最后一个到达的线程执行完barrierAction方法,等await()返回后,所有线程才会继续执行后续的代码,示例中barrierActio()方法故意设定了较长的等待时间,但是程序依然在等待await()返回。

      另外,为了避免await()无休止的等待,JDK同样提供了带等待时间的await方法,观察源码片段:

    if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
    }

      这里要特别注意的时当某个线程抛出TimeoutException异常后会调用breakBarrier()方法把broken值设置为true: 

        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }

      之所以这里提到这个,是因为在dowait()方法中,如果broken为真,函数将直接返回,抛出BrokenBarrierException,造成结果:

        1、如果使用了带barrierCommand方法的构造函数,这里barrierCommand将不会执行;

        2、如果多个线程设置的await()时间不一样,例如A设置的等待时间较短,假设1秒后超时返回,那么B、C即使设置更长的等待时间,他们也不会在await()处等待,而是在运行到await()后立刻抛出BrokenBarrierException。

    /**
         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
    if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

      使用带等待参数的await(),运行上文中的代码,故意使等待超时,结果如下:

      另外CyclicBarrier是可以重用的,这里直接给出@海子博客中的例子:

    public class Test {
        public static void main(String[] args) {
            int N = 2;
            CyclicBarrier barrier = new CyclicBarrier(N);
    
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
    
            try {
                Thread.sleep(25000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("CyclicBarrier重用");
    
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
        }
    
        static class Writer extends Thread {
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
                try {
                    Thread.sleep(5000); // 以睡眠来模拟写入数据操作
                    System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
    
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
            }
        }
    }

      运行效果如图:

      

    三、CountDownLatch 和 CyclicBarrier区别

      1、从使用上可以发现二者侧重点不同,在CountDownLatch使用时其实是存在主线程和子线程的概念,子线程在准备好主线程需要的资源后,主线程结束等待,继续剩下的工作;而在CyclicBarrier使用中,并不存在主和次的说法,更像是一组线程在互相等待后,然后在同一时间,继续后面的操作,可以类比于现实中跑步比赛的场景,一组运动员在各自准备好起跑动作后,由裁判员发令后,统一起跑(额,这里想不到合适的业务场景,尴想了一个强行解释,或许,在优化资源利用时,设置一定的请求数,攒满了后再一起请求?有实际的应用场景欢迎补充)。

      2、等待超时CountDownLatch会直接返回,继续后续工作,CyclicBarrier首先会抛TimeoutException,同时如果barrier要等的线程数大于1,其他线程不会按设定的等待时间等待,而是抛出BrokenBarrierException后直接返回,所以使用CyclicBarrier要注意异常处理的逻辑。

      3、使用细节差异如下表:

          

       4、最后要提的是CountDownLatch 和 CyclicBarrier虽然基本都是在多线程中使用,但是同一个线程多次调用countDown()和await()其实也会使计数改变。

    参考资料:

      1、http://www.importnew.com/21889.html

      2、https://blog.csdn.net/tolcf/article/details/50925145

  • 相关阅读:
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    微信小程序TodoList
    C语言88案例-找出数列中的最大值和最小值
    C语言88案例-使用指针的指针输出字符串
  • 原文地址:https://www.cnblogs.com/lyInfo/p/9131103.html
Copyright © 2011-2022 走看看