zoukankan      html  css  js  c++  java
  • 关于Phaser

    一.Phaser是什么

    java多线程技术提供了Phaser工具类,Phaser表示“阶段器”,一个可重用的同步barrier。

    用来解决控制多个线程分阶段共同完成任务的情景问题。其作用相比CountDownLatch和CyclicBarrier更加灵活。

    二.Phaser能做什么

    通过Phaser来调节不同阶段的执行,在执行一个阶段的任务之前,需要等待并达到一个动态数量的线程,执行才会继续,这个数量就是注册到Phaser的数量。

    在CountDownLatch中,该数字无法动态配置,需要在创建实例时提供。适用于一些需要分阶段的任务的处理。

    三.Phaser原理

    在Phaser内有2个重要状态,分别是phase和party。

    phase就是阶段,初值为0,当所有的线程执行完本轮任务,同时开始下一轮任务时,意味着当前阶段已结束,进入到下一阶段,phase的值自动加1。

    party就是线程,party=4就意味着Phaser对象当前管理着4个线程。

    Phaser还有一个重要的方法经常需要被重载,那就是boolean onAdvance(int phase, int registeredParties)方法。

    此方法有2个作用:

    (1)当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。

    (2)当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。

    1、注册机制:

    与其他barrier不同的是,Phaser中的“注册的同步者(parties)”会随时间而变化,Phaser可以通过构造器初始化parties个数,

    也可以在Phaser运行期间随时加入(register)新的parties,以及在运行期间注销(deregister)parties。运行时可以随时加入、注销parties,

    只会影响Phaser内部的计数器,它建立任何内部的bookkeeping(账本),因此task不能查询自己是否已经注册了,当然你可以通过实现子类来达成这一设计要求。

    此外,CyclicBarrier、CountDownLatch需要在初始化的构造函数中指定同步者的个数,且运行时无法再次调整。

    2、同步机制:

    类似于CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果类似于CyclicBarrier的await()。

    Phaser的每个周期(generation)都有一个phase数字,phase 从0开始,当所有的已注册的parties都到达后(arrive)将会导致此phase数字自增(advance),

    当达到Integer.MAX_VALUE后继续从0开始。这个phase数字用于表示当前parties所处于的“阶段周期”,它既可以标记和控制parties的wait行为、唤醒等待的时机。

    3、中断(终止):

    Phaser可以进入Termination状态,可以通过isTermination()方法判断;当Phaser被终止后,所有的同步方法将会立即返回(解除阻塞),

    不需要等到advance(即advance也会解除阻塞),且这些阻塞方法将会返回一个负值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。

    当然,向一个termination状态的Phaser注册party将不会有效;此时onAdvance()方法也将会返回true(默认实现),即所有的parties都会被deregister,即register个数为0。

    4、Tiering(分层):

    Phaser可以“分层”,以tree的方式构建Phaser来降低“竞争”。如果一个Phaser中有大量parties,这会导致严重的同步竞争,

    所以我们可以将它们分组并共享一个parent Phaser,这样可以提高吞吐能力;Phaser中注册和注销parties都会有Child 和parent Phaser自动管理。

    当Child Phaser中中注册的parties变为非0时(在构造函数Phaser(Phaser parent,int parties),或者register()方法),Child Phaser将会注册到其Parent上;

    当Child Phaser中的parties变为0时(比如由arrivedAndDegister()方法),那么此时Child Phaser也将从其parent中注销出去。

    5、监控:

    同步的方法只会被register操作调用,对于当前state的监控方法可以在任何时候调用,比如getRegisteredParties()获取已经注册的parties个数,

    getPhase()获取当前phase周期数等;因为这些方法并非同步,所以只能反映当时的瞬间状态。

    四.Phaser使用

    示例一:

    通过Phaser控制多个线程的执行时机:有时候希望所有线程到达指定点后再同时开始执行,可以利用CyclicBarrier或CountDownLatch来实现,这里给出使用Phaser的版本。

    public class PhaserTest1 {
        public static void main(String[] args) {
            Phaser phaser = new Phaser();
            for (int i = 0; i < 10; i++) {
                phaser.register();                  // 注册各个参与者线程
           new Thread(new Task(phaser), "Thread-" + i).start();
            }
        }
    }
    
    class Task implements Runnable {
        private final Phaser phaser;
    
        Task(Phaser phaser) {
            this.phaser = phaser;
        }
    
        @Override
        public void run() {
            int i = phaser.arriveAndAwaitAdvance();     // 等待其它参与者线程到达
         // do something
            System.out.println(Thread.currentThread().getName() + ": 执行完任务,当前phase =" + i + "");
        }
    }

    输出:

    Thread-8: 执行完任务,当前phase =1
    Thread-4: 执行完任务,当前phase =1
    Thread-3: 执行完任务,当前phase =1
    Thread-0: 执行完任务,当前phase =1
    Thread-5: 执行完任务,当前phase =1
    Thread-6: 执行完任务,当前phase =1
    Thread-7: 执行完任务,当前phase =1
    Thread-9: 执行完任务,当前phase =1
    Thread-1: 执行完任务,当前phase =1
    Thread-2: 执行完任务,当前phase =1

    以上示例中,创建了10个线程,并通过register方法注册Phaser的参与者数量为10。

    当某个线程调用arriveAndAwaitAdvance方法后,arrive数量会加1,如果数量没有满足总数(参与者数量10),当前线程就是一直等待,当最后一个线程到达后,所有线程都会继续往下执行。

    注意:

    arriveAndAwaitAdvance方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance方法也不会返回或抛出异常,而是继续等待。

    示例二:

    通过Phaser实现开关。希望一些外部条件得到满足后,然后打开开关,线程才能继续执行,看下如何用Phaser来实现此功能。

    public class PhaserTest2 {
    
        public static void main(String[] args) throws IOException {
            Phaser phaser = new Phaser(1);       // 注册主线程,当外部条件满足时,由主线程打开开关
            for (int i = 0; i < 10; i++) {
                phaser.register();                      // 注册各个参与者线程
                new Thread(new Task2(phaser), "Thread-" + i).start();
            }
    
            // 外部条件:等待用户输入命令
            System.out.println("Press ENTER to continue");
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            reader.readLine();
    
            // 打开开关
            phaser.arriveAndDeregister();
            System.out.println("主线程打开了开关");
        }
    }
    
    class Task2 implements Runnable {
        private final Phaser phaser;
    
        Task2(Phaser phaser) {
            this.phaser = phaser;
        }
    
        @Override
        public void run() {
            int i = phaser.arriveAndAwaitAdvance();     // 等待其它参与者线程到达
    
            // do something
            System.out.println(Thread.currentThread().getName() + ": 执行完任务,当前phase =" + i + "");
        }
    }

    输出:

    主线程打开了开关
    Thread-7: 执行完任务,当前phase =1
    Thread-4: 执行完任务,当前phase =1
    Thread-3: 执行完任务,当前phase =1
    Thread-1: 执行完任务,当前phase =1
    Thread-0: 执行完任务,当前phase =1
    Thread-9: 执行完任务,当前phase =1
    Thread-8: 执行完任务,当前phase =1
    Thread-2: 执行完任务,当前phase =1
    Thread-5: 执行完任务,当前phase =1
    Thread-6: 执行完任务,当前phase =1

    以上示例中,只有当用户按下回车之后,任务才真正开始执行。

    这里主线程Main相当于一个协调者,用来控制开关打开的时机,arriveAndDeregister方法不会阻塞,该方法会将到达数加1,同时减少一个参与者数量,最终返回线程到达时的phase值。

    示例三:

    通过Phaser控制任务的执行轮数:

    public class PhaserTest3 {
        public static void main(String[] args) throws IOException {
    
            int repeats = 3;    // 指定任务最多执行的次数
    
            Phaser phaser = new Phaser() {
                @Override
                protected boolean onAdvance(int phase, int registeredParties) {
                    System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                    return phase + 1 >= repeats  || registeredParties == 0;
                }
            };
    
            for (int i = 0; i < 10; i++) {
                phaser.register();                      // 注册各个参与者线程
           new Thread(new Task3(phaser), "Thread-" + i).start();
            }
        }
    }
    
    class Task3 implements Runnable {
        private final Phaser phaser;
    
        Task3(Phaser phaser) {
            this.phaser = phaser;
        }
    
        @Override
        public void run() {
            while (!phaser.isTerminated()) {   //只要Phaser没有终止, 各个线程的任务就会一直执行
                int i = phaser.arriveAndAwaitAdvance();     // 等待其它参与者线程到达
                // do something
                System.out.println(Thread.currentThread().getName() + ": 执行完任务");
            }
        }
    }

    输出:

    ---------------PHASE[0],Parties[5] ---------------
    Thread-4: 执行完任务
    Thread-1: 执行完任务
    Thread-2: 执行完任务
    Thread-3: 执行完任务
    Thread-0: 执行完任务
    ---------------PHASE[1],Parties[5] ---------------
    Thread-0: 执行完任务
    Thread-3: 执行完任务
    Thread-1: 执行完任务
    Thread-4: 执行完任务
    Thread-2: 执行完任务
    ---------------PHASE[2],Parties[5] ---------------
    Thread-2: 执行完任务
    Thread-4: 执行完任务
    Thread-1: 执行完任务
    Thread-0: 执行完任务
    Thread-3: 执行完任务

    以上示例中,在创建Phaser对象时,覆写了onAdvance方法,这个方法类似于CyclicBarrier中的barrierAction任务。

    也就是说,当最后一个参与者到达时,会触发onAdvance方法,入参phase表示到达时的phase值,registeredParties表示到达时的参与者数量,返回true表示需要终止Phaser。

    通过phase + 1 >= repeats ,来控制阶段(phase)数的上限为2(从0开始计),最终控制了每个线程的执行任务次数为repeats次。

    总结:

    (1)Phaser适用于多阶段多任务的场景,每个阶段的任务都可以控制得很细;

    (2)Phaser内部使用state变量及队列实现整个逻辑;

    (3)state的高32位存储当前阶段phase,中16位存储当前阶段参与者(任务)的数量parties,低16位存储未完成参与者的数量unarrived;

    (4)队列会根据当前阶段的奇偶性选择不同的队列;

    (5)当不是最后一个参与者到达时,会自旋或者进入队列排队来等待所有参与者完成任务;

    (6)当最后一个参与者完成任务时,会唤醒队列中的线程并进入下一个阶段;

  • 相关阅读:
    视图创建
    根据表格作业题
    表格 作业题练习
    创建表格 练习题
    聚合函数、数学函数、日期时间函数
    接口自动化框架
    python+request+Excel 几十份接口测试用例一起自动化测试的接口测试框架
    python3 函数
    pip源头
    爬虫
  • 原文地址:https://www.cnblogs.com/ZJOE80/p/12896169.html
Copyright © 2011-2022 走看看