zoukankan      html  css  js  c++  java
  • Phaser相位(工具的实战案例使用)

    Phaser没指定parties的时候需要先注册

    package com.dwz.phaser;
    
    import java.util.Random;
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    public class PhaserExample {
        private static final Random random = new Random(System.currentTimeMillis());
        
        static class Task extends Thread {
            private final Phaser phaser;
    
            public Task(Phaser phaser) {
                this.phaser = phaser;
                this.phaser.register();
                start();
            }
            
            @Override
            public void run() {
                System.out.println("The worker [" + getName() + "] is working ...");
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                phaser.arriveAndAwaitAdvance();
            }
        }
        
        public static void main(String[] args) {
            final Phaser phaser = new Phaser();
            IntStream.rangeClosed(1, 5).boxed().map(i -> phaser).forEach(Task::new);
            
            phaser.register();
            phaser.arriveAndAwaitAdvance();
            System.out.println("All of task finished the task.");
        }
    }

    每个run()里面的phaser.arriveAndAwaitAdvance()都是一轮phaser的消费

    package com.dwz.phaser;
    
    import java.util.Random;
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    /**
     *    每个run()里面的phaser.arriveAndAwaitAdvance()都是一轮getPhase()的消费
     *    phaser的同一阶段arriveAndAwaitAdvance()全部执行完成才会执行下一阶段,即是相同的getPhase()值时
     */
    public class PhaserExample2 {
        private static final Random random = new Random(System.currentTimeMillis());
        
        static class Athletes extends Thread {
            private final int num;
            private final Phaser phaser;
            
            public Athletes(int num, Phaser phaser) {
                this.num = num;
                this.phaser = phaser;
            }
            
            @Override
            public void run() {
                try {
                    System.out.println(num + ": start running...");
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                    System.out.println(num + ": end running...");
                    System.out.println(num + "getPhase()=>" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                    
                    System.out.println(num + ": start bicycle...");
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                    System.out.println(num + ": end bicycle...");
                    System.out.println(num + "getPhase()=>" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                    
                    System.out.println(num + ": start long jump...");
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                    System.out.println(num + ": end long jump...");
                    System.out.println(num + "getPhase()=>" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            final Phaser phaser = new Phaser(5);
            
            for(int i = 0; i < 5; i++) {
                new Athletes(i, phaser).start();
            }
        }
    }

    Phaser可以动态的创建也可以动态的注销

    package com.dwz.phaser;
    
    import java.util.Random;
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    /**
     *     Phaser可以动态的创建也可以动态的注销
     *     相当于一轮一轮的动作完成
     * 
     *    由于一个运动员受伤了, long jump这个动作没有完成,其它运动员都在等待他完成
     *    导致线程不能终止。
     */
    public class PhaserExample3 {
        private static final Random random = new Random(System.currentTimeMillis());
        
        static class InjuredAthletes extends Thread {
            private final int num;
            private final Phaser phaser;
            
            public InjuredAthletes(int num, Phaser phaser) {
                this.num = num;
                this.phaser = phaser;
            }
            
            @Override
            public void run() {
                try {
                    sport(phaser, num + ": start running...", num + ": end running...");
                    
                    sport(phaser, num + ": start bicycle...", num + ": end bicycle...");
                    
    //                System.out.println("Oh, shit. I am injured.");
                    
                    System.out.println("Oh shit, i am injured, i will exited.");
                    phaser.arriveAndDeregister();//取消该相位注册让其他的相位不再等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        static class Athletes extends Thread {
            private final int num;
            private final Phaser phaser;
            
            public Athletes(int num, Phaser phaser) {
                this.num = num;
                this.phaser = phaser;
            }
            
            @Override
            public void run() {
                try {
                    sport(phaser, num + ": start running...", num + ": end running...");
                    
                    sport(phaser, num + ": start bicycle...", num + ": end bicycle...");
                    
                    sport(phaser, num + ": start long jump...", num + ": end long jump...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        private static void sport(Phaser phaser, String str1, String str2) throws InterruptedException {
            System.out.println(str1);
            TimeUnit.SECONDS.sleep(random.nextInt(5));
            System.out.println(str2);
            phaser.arriveAndAwaitAdvance();
        }
        
        public static void main(String[] args) {
            final Phaser phaser = new Phaser(5);
            
            for(int i = 1; i < 5; i++) {
                new Athletes(i, phaser).start();
            }
            
            new InjuredAthletes(5, phaser).start();
        }
    }

    phaser的onAdvance方法返回值true和false

    返回值true时:phaser.arriveAndAwaitAdvance()可能会产生阻塞,等待未完成的相位

    package com.dwz.phaser;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    
    public class PhaserExample4 {
        static class OnAdvanceTask extends Thread {
            private final Phaser phaser;
            
            OnAdvanceTask(String name, Phaser phaser) {
                super(name);
                this.phaser = phaser;
            }
            
            @Override
            public void run() {
                System.out.println(getName() + " I am start and the phaser " + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                System.out.println(getName() + " I am end and the phaser " + phaser.getPhase());
                
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(getName().equals("Alex")) {
                    System.out.println(getName() + " I am start and the phaser " + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                    System.out.println(getName() + " I am end and the phaser " + phaser.getPhase());
                }
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            final Phaser phaser = new Phaser(2) {
                @Override
                protected boolean onAdvance(int phase, int registeredParties) {
                    return false;
    //                return true;
                }
            };
            
            new OnAdvanceTask("Alex", phaser).start();
            new OnAdvanceTask("Jack", phaser).start();
            
            TimeUnit.SECONDS.sleep(2);
            System.out.println(phaser.getArrivedParties());
            System.out.println(phaser.getUnarrivedParties());
        }
    }

    arrive 不会阻塞其它的相位,而是立即返回

    示例代码:

    package com.dwz.phaser;
    
    import java.util.Random;
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    
    public class PhaserExample5 {
        private static final Random random = new Random(System.currentTimeMillis());
        
        //arrive 不会阻塞其它的相位,而是立即返回
        public static void main(String[] args) throws InterruptedException {
            final Phaser phaser = new Phaser(5);
            for(int i = 0; i < 4; i++) {
                new ArriveTask(phaser, i).start();
            }
            
            //等待arrive之前的业务全部执行完
            phaser.arriveAndAwaitAdvance();
            System.out.println("The phaser 1 work finished done.");
        }
        
        private static class ArriveTask extends Thread {
            private final Phaser phaser;
    
            public ArriveTask(Phaser phaser, int no) {
                super(String.valueOf(no));
                this.phaser = phaser;
            }
            
            @Override
            public void run() {
                System.out.println(getName() + " start working.");
                sleepSeconds();
                System.out.println(getName() + " the phaser one is running.");
                phaser.arrive();
                
                sleepSeconds();
                System.out.println(getName() + " keep to do other thing.");
            }
        }
        
        private static void sleepSeconds() {
            try {
               TimeUnit.SECONDS.sleep(random.nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    抛出问题:awaitAdvance can decremental the arrived parties?

    package com.dwz.phaser;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    /**
     *    awaitAdvance can decremental the arrived parties?
     */
    public class PhaserExample6 {
        //awaitAdvance
        //如果Phaser.awaitAdvance(int phase)中 phase==phaser.getPhase()会产生阻塞,反之不会
        //Phaser.awaitAdvance对parties没有影响
        public static void main(String[] args) throws InterruptedException {
            /*final Phaser phaser = new Phaser(6);
            new Thread(() -> phaser.awaitAdvance(phaser.getPhase())).start();
            TimeUnit.SECONDS.sleep(3);
            
            System.out.println(phaser.getArrivedParties());
            System.out.println(phaser.getUnarrivedParties());*/
            
            final Phaser phaser = new Phaser(6);
            IntStream.rangeClosed(0, 5).boxed().map(i -> phaser).forEach(AwaitAdvanceTask::new);
            //监控phaser.arriveAndAwaitAdvance()是否执行完
            phaser.awaitAdvance(phaser.getPhase());
            System.out.println("========================");
        }
        
        private static class AwaitAdvanceTask extends Thread {
            private final Phaser phaser;
    
            public AwaitAdvanceTask(Phaser phaser) {
                this.phaser = phaser;
                start();
            }
            
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                phaser.arriveAndAwaitAdvance();
                System.out.println(getName() + " finished work.");
            }
        }
    }

    打断相位执行:phaser.awaitAdvanceInterruptibly 与thread.interrupt()的配合使用

    package com.dwz.phaser;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    
    public class PhaserExample7 {
        public static void main(String[] args) throws InterruptedException {
            /*final Phaser phaser = new Phaser(3);
            Thread thread = new Thread(phaser::arriveAndAwaitAdvance);
            thread.start();
            System.out.println("===================");
            TimeUnit.SECONDS.sleep(10);
            
            thread.interrupt();
            System.out.println("==============thread.interrupt()=============");*/
            
            final Phaser phaser = new Phaser(3);
            Thread thread = new Thread(() -> {
                try {
                    phaser.awaitAdvanceInterruptibly(phaser.getPhase());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            
            thread.start();
            System.out.println("===================");
            TimeUnit.SECONDS.sleep(10);
            
            thread.interrupt();
            System.out.println("==============thread.interrupt()=============");
        }
    }

    相位销毁phaser.forceTermination()

    package com.dwz.phaser;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    /**
     *    相位销毁
     */
    public class PhaserExample8 {
        public static void main(String[] args) throws InterruptedException {
            final Phaser phaser = new Phaser(3);
            
            new Thread(phaser::arriveAndAwaitAdvance).start();
            
            TimeUnit.SECONDS.sleep(3);
            System.out.println(phaser.isTerminated());
            
            phaser.forceTermination();
            System.out.println(phaser.isTerminated());
        }
    }
  • 相关阅读:
    python编程学习进度二
    python编程学习进度一
    阅读笔记(6)-《高效程序员的45个习惯》
    阅读笔记(5)-《高效程序员的45个习惯》
    阅读笔记(4)-《高效程序员的45个习惯》
    阅读笔记(3)-《高效程序员的45个习惯》
    阅读笔记(2)-《高效程序员的45个习惯》
    寒假生活15
    寒假生活14(补)
    寒假生活13
  • 原文地址:https://www.cnblogs.com/zheaven/p/13410661.html
Copyright © 2011-2022 走看看