zoukankan      html  css  js  c++  java
  • 线程的三个同步器



    不知不觉就遇到了线程同步器问题,查了资料写下了总结


    1. CountDownLatch

    日常中会有开启多个线程去并发执行任务,而主线程要等所有子线程执行完之后才能运行的需求。之前我们是使用Thread.join方法来实现的,过程如下:

    public static void main(String[] args) throws InterruptedException {
    
        Thread t1 = new Thread( () -> {
            try {
                Thread.sleep(1000);
                System.out.println("t1 over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    
        Thread t2 = new Thread( () -> {
            try {
                Thread.sleep(2000);
                System.out.println("t2 over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    
        t1.start();
        t2.start();
        
        t1.join();
        t2.join();
    
        System.out.println("mian over");
    }
    
    t1 over
    t2 over
    mian over
    


    join()方法不够灵活,现在JDK提供了CountDownLatch这个类来实现所需功能

    private static CountDownLatch countDownLatch = new CountDownLatch(2);
    
    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService t = Executors.newCachedThreadPool();
    
        Runnable r1 = () -> {
            try {
                System.out.println("r1 sleep");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        };
    
        Runnable r2 = () -> {
            try {
                System.out.println("r2 sleep");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        };
    
        t.submit(r1);
        t.submit(r2);
    
        System.out.println("main wait");
        countDownLatch.await();
        System.out.println("main over");
    }
    
    main wait
    r1 sleep
    r2 sleep
    main over
    


    CountDownLatch流程:

    • 新建CountDownLatch实例,传入计数器次数
    • 主线程调用CountDownLatch.await()方法后会被阻塞
    • 子线程中在某处调用CountDownLatch.countDown()方法可使内部计数器减1
    • 当计数器变成0时,主线程的await()方法才会返回

    CountDownLatch优点:

    • 调用Thread.join()调用线程会被阻塞至子线程运行完毕,而CountDownLatch.countDown()可在线程运行中执行
    • 使用线程池时是提交任务的,而没有接触到线程无法使用线程方法,那么countDown()可加在Runnable中执行

    CountDownLatch原理:

    内部维护了一个计数器,当计数器为0就放行,源码就不放了,熟悉AQS的同学想想就知道怎么回事

    • 继承了AQS,其实就是用AQS的state来表示计数器
    • await()方法内部有acquireSharedInterruptibly(),后者调用了重写tryaquireShared()其实就是判断计数器是否为0,不为0则阻塞进AQS队列
    • countDown()方法内部有releaseShared(),后者调用了重写tryReleaseShared()计数器减一,若为0,则唤醒阻塞线程




    2. CyclicBarrier

    满足多个线程都到达同一个位置后才全部开始运行的需求。CountDownLatch是一次性使用的,计数器为0后再次调用会直接返回,此时升级版的CyclicBarrier来了,其一可以满足计数器重置功能,且二还可以让一组线程达到一个状态后再全部同时执行


    场景要求:假设一个任务分为3个阶段,每个线程要串行地从低阶段执行到高阶段

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, 
                            () -> System.out.println("一个阶段完成"));
    
    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService service = Executors.newCachedThreadPool();
    
        Runnable r1 = () -> {
            try {
                System.out.println(Thread.currentThread() + "Step1");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step2");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step3");
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    
        Runnable r2 = () -> {
            try {
                System.out.println(Thread.currentThread() + "Step1");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step2");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step3");
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    
        service.submit(r1);
        service.submit(r2);
    
        service.shutdown();
    }
    
    Thread[pool-1-thread-1,5,main]Step1
    Thread[pool-1-thread-2,5,main]Step1
    一个阶段完成
    Thread[pool-1-thread-1,5,main]Step2
    Thread[pool-1-thread-2,5,main]Step2
    一个阶段完成
    Thread[pool-1-thread-1,5,main]Step3
    Thread[pool-1-thread-2,5,main]Step3
    


    CyclicBarrier的流程

    • 和上面差不多就不一一解释了
    • CyclicBarrier的构造方法中,第一个参数为计数器次数,第二个为阶段结束后要执行的方法

    CyclicBarrier的原理

    • 基于独占锁,底层是AQS实现,独占锁可以原子性改变计数器,以及条件队列阻塞线程来实现线程同步
    • 内部有parties和count变量,实现重置功能
    • await()方法内调用dowait()方法
      • 获取锁更新次数减一
      • 没有为0,阻塞当前线程加入条件队列
      • 为0执行屏蔽点任务,然后唤醒条件队列的全部线程




    3. Semaphore

    不同与前两者,Semaphore信号量内部计数器是递增的,在需要同步的地方调用acquire指定需要同步的个数即可


    private static Semaphore semaphore = new Semaphore(0);
    
    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService service = Executors.newCachedThreadPool();
    
        Runnable r1 = () -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "over");
            semaphore.release();;
        };
    
        Runnable r2 = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "over");
            semaphore.release();
        };
    
        service.submit(r1);
        service.submit(r2);
    
        semaphore.acquire(2);
        System.out.println("All child thread over");
    
        service.shutdown();
    }
    


    Semaphore的流程

    • Semaphore的构造函数传参复制当前计数器的值
    • 每个线程内部调用release()即计数器加1
    • 主线程调用acquire()方法传参为2 ,会被阻塞至计数器到达2

    Semaphore的原理

    • 底层还是使用AQS,提供了公平与非公平,也是用state表示次数
    • acquire()方法获取一个信号量,并且state减一
      • 若为0,直接返回
      • 不为0当前线程会被加入AQS阻塞队列
    • release()方法,把当前Semaphore的信号量加1,然后会选择一个信号量满足的线程进行激活
    • 内部还实现了公平与非公平策略


  • 相关阅读:
    中介模式与外观模式(门面模式)区别
    java反射
    Spring注解@ResponseBody,@RequestBody
    Spring事务管理
    Junit运行在Spring环境下
    java开发常用到的jar包总结
    java二维数组
    Android开发之执行定时任务AlarmManager,Timer,Thread
    Android开发之Android Context,上下文(Activity Context, Application Context)
    Android开发之创建App Widget和更新Widget内容
  • 原文地址:https://www.cnblogs.com/Howlet/p/12685072.html
Copyright © 2011-2022 走看看