zoukankan      html  css  js  c++  java
  • 线程的三个同步器



    不知不觉就遇到了线程同步器问题,查了资料写下了总结


    1. CountDownLatch

    日常中会有开启多个线程去并发执行任务,而主线程要等所有子线程执行完之后才能运行的需求。之前我们是使用Thread.join方法来实现的,过程如下:

    public static void main(String[] args) throws InterruptedException {
    
        Thread t1 = new Thread( () -> {
            try {
                Thread.sleep(1000);
                System.out.println("t1 over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    
        Thread t2 = new Thread( () -> {
            try {
                Thread.sleep(2000);
                System.out.println("t2 over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    
        t1.start();
        t2.start();
        
        t1.join();
        t2.join();
    
        System.out.println("mian over");
    }
    
    t1 over
    t2 over
    mian over
    


    join()方法不够灵活,现在JDK提供了CountDownLatch这个类来实现所需功能

    private static CountDownLatch countDownLatch = new CountDownLatch(2);
    
    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService t = Executors.newCachedThreadPool();
    
        Runnable r1 = () -> {
            try {
                System.out.println("r1 sleep");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        };
    
        Runnable r2 = () -> {
            try {
                System.out.println("r2 sleep");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        };
    
        t.submit(r1);
        t.submit(r2);
    
        System.out.println("main wait");
        countDownLatch.await();
        System.out.println("main over");
    }
    
    main wait
    r1 sleep
    r2 sleep
    main over
    


    CountDownLatch流程:

    • 新建CountDownLatch实例,传入计数器次数
    • 主线程调用CountDownLatch.await()方法后会被阻塞
    • 子线程中在某处调用CountDownLatch.countDown()方法可使内部计数器减1
    • 当计数器变成0时,主线程的await()方法才会返回

    CountDownLatch优点:

    • 调用Thread.join()调用线程会被阻塞至子线程运行完毕,而CountDownLatch.countDown()可在线程运行中执行
    • 使用线程池时是提交任务的,而没有接触到线程无法使用线程方法,那么countDown()可加在Runnable中执行

    CountDownLatch原理:

    内部维护了一个计数器,当计数器为0就放行,源码就不放了,熟悉AQS的同学想想就知道怎么回事

    • 继承了AQS,其实就是用AQS的state来表示计数器
    • await()方法内部有acquireSharedInterruptibly(),后者调用了重写tryaquireShared()其实就是判断计数器是否为0,不为0则阻塞进AQS队列
    • countDown()方法内部有releaseShared(),后者调用了重写tryReleaseShared()计数器减一,若为0,则唤醒阻塞线程




    2. CyclicBarrier

    满足多个线程都到达同一个位置后才全部开始运行的需求。CountDownLatch是一次性使用的,计数器为0后再次调用会直接返回,此时升级版的CyclicBarrier来了,其一可以满足计数器重置功能,且二还可以让一组线程达到一个状态后再全部同时执行


    场景要求:假设一个任务分为3个阶段,每个线程要串行地从低阶段执行到高阶段

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, 
                            () -> System.out.println("一个阶段完成"));
    
    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService service = Executors.newCachedThreadPool();
    
        Runnable r1 = () -> {
            try {
                System.out.println(Thread.currentThread() + "Step1");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step2");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step3");
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    
        Runnable r2 = () -> {
            try {
                System.out.println(Thread.currentThread() + "Step1");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step2");
                cyclicBarrier.await();
    
                System.out.println(Thread.currentThread() + "Step3");
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    
        service.submit(r1);
        service.submit(r2);
    
        service.shutdown();
    }
    
    Thread[pool-1-thread-1,5,main]Step1
    Thread[pool-1-thread-2,5,main]Step1
    一个阶段完成
    Thread[pool-1-thread-1,5,main]Step2
    Thread[pool-1-thread-2,5,main]Step2
    一个阶段完成
    Thread[pool-1-thread-1,5,main]Step3
    Thread[pool-1-thread-2,5,main]Step3
    


    CyclicBarrier的流程

    • 和上面差不多就不一一解释了
    • CyclicBarrier的构造方法中,第一个参数为计数器次数,第二个为阶段结束后要执行的方法

    CyclicBarrier的原理

    • 基于独占锁,底层是AQS实现,独占锁可以原子性改变计数器,以及条件队列阻塞线程来实现线程同步
    • 内部有parties和count变量,实现重置功能
    • await()方法内调用dowait()方法
      • 获取锁更新次数减一
      • 没有为0,阻塞当前线程加入条件队列
      • 为0执行屏蔽点任务,然后唤醒条件队列的全部线程




    3. Semaphore

    不同与前两者,Semaphore信号量内部计数器是递增的,在需要同步的地方调用acquire指定需要同步的个数即可


    private static Semaphore semaphore = new Semaphore(0);
    
    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService service = Executors.newCachedThreadPool();
    
        Runnable r1 = () -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "over");
            semaphore.release();;
        };
    
        Runnable r2 = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "over");
            semaphore.release();
        };
    
        service.submit(r1);
        service.submit(r2);
    
        semaphore.acquire(2);
        System.out.println("All child thread over");
    
        service.shutdown();
    }
    


    Semaphore的流程

    • Semaphore的构造函数传参复制当前计数器的值
    • 每个线程内部调用release()即计数器加1
    • 主线程调用acquire()方法传参为2 ,会被阻塞至计数器到达2

    Semaphore的原理

    • 底层还是使用AQS,提供了公平与非公平,也是用state表示次数
    • acquire()方法获取一个信号量,并且state减一
      • 若为0,直接返回
      • 不为0当前线程会被加入AQS阻塞队列
    • release()方法,把当前Semaphore的信号量加1,然后会选择一个信号量满足的线程进行激活
    • 内部还实现了公平与非公平策略


  • 相关阅读:
    C#利用反射动态调用类及方法
    系统程序监控软件
    SQL server 2008 安装和远程访问的问题
    sql server 创建临时表
    IIS 时间问题
    windows 2008 安装 sql server 2008
    sql server xml nodes 的使用
    Window 7sp1 安装vs2010 sp1 打开xaml文件崩溃
    CSS资源网址
    Could not load type 'System.ServiceModel.Activation.HttpModule' from assembly 'System.ServiceModel, Version=3.0.0.0
  • 原文地址:https://www.cnblogs.com/Howlet/p/12685072.html
Copyright © 2011-2022 走看看