zoukankan      html  css  js  c++  java
  • JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

    Join 方法可以使当前线程等待子线程,如果子线程未结束,则会一致处在wait状态。

    因为其内部是通过wait 方法实现的,当执行完毕后会调用notifyAll 释放锁。

    CountDownLatch 允许一个或多个线程等待其他线程完成操作,相比join ,能做的事情更多。

    private static CountDownLatch countDownLatch = new CountDownLatch(2);
        public static void main(String[] args){
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("begin");
                    countDownLatch.countDown();
                    System.out.println("middle");
                    countDownLatch.countDown();
                }
            });
            t.start();
            try {
                countDownLatch.await();
                System.out.println("end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    输出:

    由结果看出,实现的效果与join相同。

    对比join ,其构造函数有一个int参数,表示计数器,可以手动控制其需要等待的次数。每次调用countDown 会减去1,其wait方法会一直等待计数器变成0。

    用法更加灵活,计数参数N 可以代表N个线程、N个步骤,总之可以自由的控制。

    如果调用await(long time,TimeUnit unit)方法等待,那么当前线程在等待一定时间后就不会再做等待,而是继续执行当前线程(需要注意的地方是,此时子线程还是在执行中的)

    Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("begin");
                    countDownLatch.countDown();
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("middle");
                    countDownLatch.countDown();
                }
            });
            t.start();
            try {
                countDownLatch.await(2, TimeUnit.SECONDS);
                System.out.println("end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

    输出:

     结果看出,主线程main在等待2秒后,发现子线程还是没有执行完毕,则继续执行,此时子线程并没有关闭,所以在等待到达5秒后,继续执行子线程输出middle。

     CyclicBarrier 同步屏障

    说明:中文意思为循环的屏障,要求指定线程到达屏障点,会可继续执行,如:

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        public static void main(String[] args){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"begin");
                    try {
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName() + "end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread t = new Thread(runnable,"A");
            Thread t2 = new Thread(runnable,"B");
            t.start();
            t2.start();
    
            try {
                cyclicBarrier.await();
                System.out.println("main end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

    输出:

     由结果看出,因为屏障点设置的数量为4,实际上执行了await方法(调用一次则减一)只有3个,2个子线程和一个main线程,所以大家都没得玩了,如果把计算点改为3,就正常了(当计数器为0时,会通知所有阻塞的线程可以继续执行了)

    如果计算器过小,比如有3个线程调用了wait,而计数器设置为2,那么前面2个先执行wait的线程会停止阻塞,计算器又会从2开始计算,也就是说,还需要一个线程调用await来释放它,如下:

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        public static void main(String[] args){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"begin");
                    try {
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName() + "end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread t = new Thread(runnable,"A");
            Thread t2 = new Thread(runnable,"B");
            Thread t3 = new Thread(runnable,"C");
            t.start();
            t2.start();
            t3.start();
            try {
                cyclicBarrier.await();
                System.out.println("main end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

    输出

    关于CyclicBarrier 的构造函数还有一种用法,就是优先执行,也就是说如果我们设置计算器为2,在线程A与线程B调用await后,在停止阻塞AB之前会优先执行默认的线程,如下:

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("先执行我");
            }
        });
        public static void main(String[] args){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"begin");
                    try {
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName() + "end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread t = new Thread(runnable,"A");
            t.start();
            try {
                cyclicBarrier.await();
                System.out.println("main end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

    输出:

    A线程先执行了输出了begin,然后到达屏障开始等待,此时主线程也同时调用了await,计算器的值2 - 1 - 1 = 0 ,准备释放A 和main,但是要先执行第二个参数线程,所以会先输出“先执行我”,然后A和main线程从block状态转换为runable 状态。

    场景:计算多个线程的结果,利用第二个参数(线程)。如:

    private CyclicBarrier barrier = new CyclicBarrier(4,this);
    
        private Executor executor = Executors.newFixedThreadPool(4);
    
        // 保存
        private ConcurrentMap<String,Integer> map = new ConcurrentHashMap<>();
    
        private void count(){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // 记入运算结果
                    map.put(Thread.currentThread().getName(),1);
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
            for (int i = 0; i < 4; i++) {
                executor.execute(runnable);
            }
        }
    
        @Override
        public void run() {
            int result = 0;
            for (Map.Entry<String,Integer> entry:map.entrySet()) {
                result += entry.getValue();
            }
            //
            map.put("result",result);
            System.out.println(result);
        }
    
        public static void main(String[] args){
            TestCount testCount = new TestCount();
            testCount.count();
        }

    输出:

    结果输出4。

    CountDownLatch 与CyclicBarrier:

    CyclicBarrier 对比CountDownLatch 还可以调用reset方法重置。,让线程重新执行一次。


    Semaphore:信号

    可以控制并发量工具,比如有100个连接需要获取数据库连接,保存数据,但是数据库连接池最大连接数为10,那么可以通过这个工具类来控制,如下:

    private static final int COUNT = 100;
    
        private static ExecutorService threadPool = Executors.newFixedThreadPool(COUNT);
    
        private static Semaphore semaphore = new Semaphore(10);
    
        public  static  void main(String[] args){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("保存数据");
                        Thread.sleep(2000);
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            for (int i = 0; i < COUNT; i++) {
                threadPool.execute(runnable);
            }
    
            threadPool.shutdown();
        }

    输出:

     在代码中执行会发现,每次只有10个线程在执行,这就是信号灯的作用~ acquire 方法意为获取许可证,release方法意为归还许可证。

    Exchanger:

    线程之间交换数据彼此的数据,比如A线程执行到指定同步点后会等待B线程,当B线程也到达时,交换数据,然后各自拜拜~

    private static Exchanger<String> exchanger = new Exchanger<>();
    
        private static ExecutorService pool = Executors.newFixedThreadPool(2);
    
        public static void main(String[] args){
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String what = exchanger.exchange("钞票");
                        System.out.println("我是线程A,我拿到了"+what);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String what = exchanger.exchange("香烟");
                        System.out.println("我是线程B,我拿到了"+what);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            pool.shutdown();
        }

    输出:

     由结果看出,线程A和线程B各自拿到了自己想要的东西~~

     总结:平时使用的最多的应该是CountDownLatch,因为使用场景的关系,用它用的会比较多,这几个工具类各有特点~

    像是CyclicBarrier 在执行一些大型计算的时候也许会用到,Semaphore对于一些共享资源的控制,Exchanger对于需要交换信息比较合适,一般是不同的事情并行处理会比较好,而且是必须要交换数据。

    对于像是解析excel,json文件执行导入数据的操作个人认为使用CountDownLatch 就足够了~~ 哈哈

    成灰之前,抓紧时间做点事!!
  • 相关阅读:
    COJ 1002 WZJ的数据结构(二)(splay模板)
    生成网络流图
    最小费用最大流MCMF zkw费用流
    COJ 2003 选根 (树的重心)
    最小费用最大流MCMF 最小增广
    PDO 基础知识
    使 用 Jquery 全选+下拉+单选+事件+挂事件
    搜 房 网 站 设 计 练 习
    百分比进度条
    在PHP系统里连接MySQL 数据访问,+ + + + + 数据删除
  • 原文地址:https://www.cnblogs.com/jony-it/p/10846964.html
Copyright © 2011-2022 走看看