zoukankan      html  css  js  c++  java
  • Java并发编程-移相器

    移相器(Phaser)内有2个重要状态,分别是phase和party。
    phase就是阶段,初值为0,当所有的线程执行完本轮任务,同时开始下一轮任务时,意味着当前阶段已结束,进入到下一阶段,phase的值自动加1。
    party就是线程,party=4就意味着Phaser对象当前管理着4个线程。
    Phaser还有一个重要的方法经常需要被重载,那就是boolean onAdvance(int phase, int registeredParties)方法。此方法有2个作用:
    1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
    2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。
    例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
    Phaser把多个线程执行的任务划分成多个阶段(phase),编程时要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段,当一个阶段中所有线程都成功完成之后,Phaser的onAdvance()被调用,可以通过覆盖添加自定义处理逻辑(类似循环屏障(关卡)使用的Runnable接口),然后Phaser类会自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者。
    register(),bulkRegister(),动态添加一个或多个参与者。
    arrive(),某个参与者完成任务后调用
    arriveAndDeregister(),任务完成,取消自己的注册。
    arriveAndAwaitAdvance(),自己完成等待其他参与者完成:进入阻塞,直到Phaser成功进入下个阶段。
    awaitAdvance()、awaitAdvanceInterruptibly(),等待phaser进入下个阶段,参数为当前阶段的编号,后者可以设置超时和处理中断请求。
    另外,Phaser的一个重要特征是多个Phaser可以组成树形结构,Phaser提供了构造方法来指定当前对象的父对象;当一个子对象参与者>0,会自动注册到父对象中;当=0,自动解除注册。

    package org.suxuan;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.atomic.AtomicReferenceArray;
    
    public class PhaserDemo {
        public static void main(String[] args) {
            final int workers = 2;
            final int workLength = 10;
    
            final Phaser phaser = new Phaser(workers + 1);
            final AtomicReferenceArray<String> lane1 = new AtomicReferenceArray<String>(new String[workLength]);
            final AtomicReferenceArray<String> lane2 = new AtomicReferenceArray<String>(new String[workLength]);
    
            new Thread("Producer 1") {
                @Override
                public void run() {
                    for (int i = 0; i < workLength; i++) {
                        $sleep(20);
    
                        lane1.set(i, "lane1-answer-" + i);
    
                        System.out.printf("[%-17s] working in lane1 finished phase [%d]%n",
                                Thread.currentThread().getName(), phaser.getPhase());
    
                        phaser.arriveAndAwaitAdvance();
                    }
                }
            }.start();
    
            new Thread("Slower producer 2") {
                @Override
                public void run() {
                    for (int i = 0; i < workLength; i++) {
                        $sleep(40);
    
                        lane2.set(i, "lane2-answer-" + i);
    
                        System.out.printf("[%-17s] working in lane2 finished phase [%d]%n",
                                Thread.currentThread().getName(), phaser.getPhase());
    
                        phaser.arriveAndAwaitAdvance();
                    }
                }
            }.start();
    
            new Thread("Slow consumer") {
                @Override
                public void run() {
                    for (int start = 0; start < workLength; ) {
                        System.out.printf("[%-17s] about to wait for phase [%d] completion%n",
                                Thread.currentThread().getName(), start);
    
                        int phaseInProgress = phaser.awaitAdvance(start);
    
                        //Read all the way up to the most recent completed phases.
                        for (int i = start; i < phaseInProgress; i++) {
                            System.out.printf("[%-17s] read [%s] & [%s] from phase [%d]%n",
                                    Thread.currentThread().getName(), lane1.get(i), lane2.get(i), i);
                        }
    
                        start = phaseInProgress;
    
                        $sleep(80);
                    }
                }
            }.start();
    
            phaser.arriveAndDeregister();
        }
    
        private static void $sleep(long millis) {
            try {
                Thread.sleep(millis);
            }
            catch (InterruptedException e) {
            }
        }
    }
    


    Phaser初始化为3,之后customer线程取消自己的注册(此时只有两个生产者线程间进行同步),awaitAdvance不会阻塞,它直接返回。
    两个生产者线程,依次生成结果放置到AtomicReferenceArray中。消费者线程每睡眠80Ms后,从结果集中,把当前已经完成的结果打印出来。
    某次运行结果如下:

    [Slow consumer    ] about to wait for phase [0] completion
    [Producer 1       ] working in lane1 finished phase [0]
    [Slower producer 2] working in lane2 finished phase [0]
    [Slow consumer    ] read [lane1-answer-0] & [lane2-answer-0] from phase [0]
    [Producer 1       ] working in lane1 finished phase [1]
    [Slower producer 2] working in lane2 finished phase [1]
    [Producer 1       ] working in lane1 finished phase [2]
    [Slow consumer    ] about to wait for phase [1] completion
    [Slow consumer    ] read [lane1-answer-1] & [lane2-answer-1] from phase [1]
    [Slower producer 2] working in lane2 finished phase [2]
    [Producer 1       ] working in lane1 finished phase [3]
    [Slower producer 2] working in lane2 finished phase [3]
    [Producer 1       ] working in lane1 finished phase [4]
    [Slower producer 2] working in lane2 finished phase [4]
    [Producer 1       ] working in lane1 finished phase [5]
    [Slow consumer    ] about to wait for phase [2] completion
    [Slow consumer    ] read [lane1-answer-2] & [lane2-answer-2] from phase [2]
    [Slow consumer    ] read [lane1-answer-3] & [lane2-answer-3] from phase [3]
    [Slow consumer    ] read [lane1-answer-4] & [lane2-answer-4] from phase [4]
    [Slower producer 2] working in lane2 finished phase [5]
    [Producer 1       ] working in lane1 finished phase [6]
    [Slower producer 2] working in lane2 finished phase [6]
    [Slow consumer    ] about to wait for phase [5] completion
    [Slow consumer    ] read [lane1-answer-5] & [lane2-answer-5] from phase [5]
    [Slow consumer    ] read [lane1-answer-6] & [lane2-answer-6] from phase [6]
    [Slower producer 2] working in lane2 finished phase [7]
    [Producer 1       ] working in lane1 finished phase [7]
    [Producer 1       ] working in lane1 finished phase [8]
    [Slower producer 2] working in lane2 finished phase [8]
    [Producer 1       ] working in lane1 finished phase [9]
    [Slow consumer    ] about to wait for phase [7] completion
    [Slow consumer    ] read [lane1-answer-7] & [lane2-answer-7] from phase [7]
    [Slow consumer    ] read [lane1-answer-8] & [lane2-answer-8] from phase [8]
    [Slower producer 2] working in lane2 finished phase [9]
    [Slow consumer    ] about to wait for phase [9] completion
    [Slow consumer    ] read [lane1-answer-9] & [lane2-answer-9] from phase [9]
    
  • 相关阅读:
    学习进度表
    mysql实现跨库查询
    jmeter分布式(1台Windows,一台Mac,亲测可用互相使用)
    解决appium 连接真机Android 9启动报错.....shell "ps 'uiautomator'
    使用fiddler抓包修改请求/返回的数据
    adb 获取当前界面activity
    使用adb 命令获取APP包名
    jmeter实现登录并设置token为全局变量
    python3 SystemError: Parent module '' not loaded, cannot perform relative import
    adb 运行提示error: cannot connect to daemon
  • 原文地址:https://www.cnblogs.com/suxuan/p/4948760.html
Copyright © 2011-2022 走看看