zoukankan      html  css  js  c++  java
  • Java并发之CyclicBarrier、CountDownLatch、Phaser

    在Java多线程编程中,经常会需要我们控制并发流程,等其他线程执行完毕,或者分阶段执行。Java在1.5的juc中引入了CountDownLatchCyclicBarrier,1.7中又引入了Phaser

    CountDownLatch

    A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

    一个或多个线程等待其他线程完成一系列操作后才执行。

    流程图

    基本使用:

    使用两个 countdown latches的示例。

    第一个是开始信号,在发出执行命令前,阻止线程开始执行。

    第二个是完成信号,直到所有线程执行完毕,主线程再开始执行。

    class Driver { // ...
        void main() throws InterruptedException {
            CountDownLatch startSignal = new CountDownLatch(1);
            CountDownLatch doneSignal = new CountDownLatch(N);
    
            for (int i = 0; i < N; ++i) // create and start threads
                new Thread(new Worker(startSignal, doneSignal)).start();
    
            doSomethingElse();            // don't let run yet
            startSignal.countDown();      // let all threads proceed
            doSomethingElse();
            doneSignal.await();           // wait for all to finish
        }
    }
    
    class Worker implements Runnable {
        private final CountDownLatch startSignal;
        private final CountDownLatch doneSignal;
        Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
        public void run() {
            try {
                startSignal.await();
                doWork();
                doneSignal.countDown();
            } catch (InterruptedException ex) {} // return;
        }
    
        void doWork() { ... }
    }}
    

    CyclicBarrier

    A synchronizati on aid that allows a set of threads to all wait for each other to reach a common barrier point.

    多个线程互相等待,直到到达同一个同步点,再继续一起执行。CyclicBarrier适用于多个线程有固定的多步需要执行,线程间互相等待,当都执行完了,在一起执行下一步。

    流程图:

    基本使用:

    public class CyclicBarrierTest {
    
        static CyclicBarrier c = new CyclicBarrier(2, new A());
    
        public static void main(String[] args) {
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        c.await();
                    } catch (Exception e) {
    
                    }
                    System.out.println(1);
                }
            }).start();
    
            try {
                c.await();
            } catch (Exception e) {
    
            }
            System.out.println(2);
        }
    
        static class A implements Runnable {
            @Override
            public void run() {
                System.out.println(3);
            }
        }
    }
    

    以上的例子利用了CyclicBarrier提供的一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

    Phaser

    A reusable synchronization barrier, similar in functionality to* {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and* {@link java.util.concurrent.CountDownLatch CountDownLatch}* but supporting more flexible usage.

    Phaser拥有与CyclicBarrierCountDownLatch类似的功劳.但是这个类提供了更加灵活的应用。它支持任务在多个点都进行同步,支持动态调整注册任务的数量。当然你也可以使用CountDownLatch,但你必须创建多个CountDownLatch对象,每一阶段都需要一个CountDownLatch对象。

    流程图:

    基本使用:

    public class Match {
    
        // 模拟了100米赛跑,10名选手,只等裁判一声令下。当所有人都到达终点时,比赛结束。
        public static void main(String[] args) throws InterruptedException {
    
            final Phaser phaser=new Phaser(1) ;
            // 十名选手
            for (int index = 0; index < 10; index++) {
                phaser.register();
                new Thread(new player(phaser),"player"+index).start();
            }
            System.out.println("Game Start");
            //注销当前线程,比赛开始
            phaser.arriveAndDeregister();
            //是否非终止态一直等待
            while(!phaser.isTerminated()){
            }
            System.out.println("Game Over");
        }
    }
    class player implements Runnable{
    
        private  final Phaser phaser ;
    
        player(Phaser phaser){
            this.phaser=phaser;
        }
        @Override
        public void run() {
            try {
                // 第一阶段——等待创建好所有线程再开始
                phaser.arriveAndAwaitAdvance();
    
                // 第二阶段——等待所有选手准备好再开始
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(Thread.currentThread().getName() + " ready");
                phaser.arriveAndAwaitAdvance();
    
                // 第三阶段——等待所有选手准备好到达,到达后,该线程从phaser中注销,不在进行下面的阶段。
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(Thread.currentThread().getName() + " arrived");
                phaser.arriveAndDeregister();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
  • 相关阅读:
    Hadoop性能调优、YARN的内存和CPU配置
    linux权限之su和sudo的差别
    Hadoop JobHistory
    Hive存储格式
    左边元素和右边一样高
    状态码
    document.ready(function(){}),window.onload,$(function(){})的区别
    模糊查询实例
    jq实时监测输入框内容改变
    博客遇到的问题
  • 原文地址:https://www.cnblogs.com/aheizi/p/7195963.html
Copyright © 2011-2022 走看看