zoukankan      html  css  js  c++  java
  • Java并发包中的线程同步器

    10.1CountDownLatch

    计数器

    new CountDownLatch(2)
    countDownLatch.countDown(); //-1 countDownLatch.await();//当计数器为0时返回

    和join之间的区别

    不用等到子进程全部执行完毕之后再返回

    是基于AQS实现的

    AQS中的state用来计数了

    void await()方法

    阻塞调用线程,当计数器值为0时,返回咯

    void await(long timeout, TimeUnit unit)

    当计数器为0时返回;或者timeoout之后

    void countDown()

    state减1 tryReleaseShared()

    void getCount()

    获取当前的state值

    小结

    • 比join方法灵活实现线程同步
    • CountDownLatch使用AQS实现
    • 初始化CountDownLatch时设置状态值,当多个线程调用countdown方法时实际是原子性递减AQS的状态值。
    • 当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器0再返回。其他线程调用countdown方法让计数器递减1,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程

    CyclicBarrier回环屏障

    CountDownLatch的计数器是一次性的

    CyclicBarrier:回环屏障,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CycleBarrierTest {
    
        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread() + "task1 merge result");
            }
        });
    
        public static void main(String[] args) {
    
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("one in");
                    try {
                        cyclicBarrier.await();
                        System.out.println("one out");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("two in");
                    try {
                        cyclicBarrier.await();
                        System.out.println("two out");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            executorService.shutdown();
        }
        /**
         * one in
         * two in
         * Thread[pool-1-thread-2,5,main]task1 merge result
         * two out
         * one out
         */
    }

    创建了一个CyclicBarrier对象,其中第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。 当第一个线程调用await方法时,计数器减1,第二个线程调用await方法时,减1。如果当前cyclicBarrier中的计数值不等于0时,就线程12都锁住

    当cyclicBarrier的值等于0时,才会去执行线程12的任务。然后cyclicBarrier被重置。

    实现原理探究

    • CyclicBarrier基于独占锁实现,本质底层还是基于AQS
    • parties用来记录需要的线程个数,表示有多少个线程调用了await
    • count一开始等于parties,每当一个线程await了,就递减1,等于0的时候就突破了内存屏障。然后重新赋值count

    int await()方法

    等待,阻塞;或者异常退出

    Boolean await(long timeout, TimeUnit unit)

    等待一定的时间后,如果没有突破屏障,也会返回

    int dowait(boolean timed, long nanos)

    如果count == 0了,先执行CyclicBarrier自己的方法,再那唤醒所有的trip条件变量的阻塞进程,

    如果count != 0  也没有设置等待,那就直接把线程放进阻塞队列,当前会被挂起并释放lock锁。如果当前线程设置了超时时间,被放进条件变量trip的阻塞队列,不过过段时间后会自动返回

    小结

    CycleBarrier可以复用

    Semaphore

    内部计数器是递增,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要直到需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
    
        private static Semaphore semaphore = new Semaphore(0);
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread() + "start");
                    semaphore.release();
                    System.out.println(Thread.currentThread() + "over");
    
                }
            });
    
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread() + "start");
                    semaphore.release();
                    System.out.println(Thread.currentThread() + "over");
    
                }
            });
    
            try {
                semaphore.acquire(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("all child thread over");
    
            executorService.shutdown();
        }
        /**
         * Thread[pool-1-thread-1,5,main]start
         * Thread[pool-1-thread-1,5,main]over
         * Thread[pool-1-thread-2,5,main]start
         * Thread[pool-1-thread-2,5,main]over
         * all child thread over
         */
    }

    阻塞当前main线程,当信号量到达release和初始化值之和时,才能解封当前线程。

    实现原理探究

    1. AQS实现
    2. 可以创建Semaphore的时候指定是否公平
    3. 用state值表示当前持有的信号量个数

    void acquire()方法

    当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。或者中断线程。

    当然有公平的实现和非公平的实现;

    void acquire(int permits)方法

    获取多个信号量的值,满足就唤醒执行,不满足就不执行呗。

    void acquireUninterruptibly()方法

    不响应中断

    void acquireUninterruptibly(int permits)方法

    获取指定个数

    不响应中断

    void release()方法

    增加一个信号量,就是state + 1

    void release(int permits)方法

    释放多个信号量

    小结

    Semaphore的计数器是不可以自动重置的。通过变相改变acquire的参数实现CycleBarrier的功能。AQS实现。

    总结

    1. 首先CountDownlatch通过计数器提供更灵活的控制,只要控制到计数器值为0,就可以往下执行。
    2. CyclicBarrier也可以达到CountDownLatch的效果,但是可以复用
    3. Semaphore采用信号递增策略,一开始并不需要关心同步的线程个数,只需要指定acquire的个数就行,并且提供信号量的公平策略和非公平策略。
  • 相关阅读:
    Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
    Centos安装openjdk
    centos7安装nodejs
    Linux服务器集群性能监控之Performance Co-Pilot(PCP)部署
    asp.net(c#)网页跳转七种方法小结
    C# 获取文件名及扩展名
    C#中的 具名参数 和 可选参数
    数据契约(DataContract)
    用LINQ在集合中查询特定对象
    jQuery中append、insertBefore、after与insertAfter方法注意事项
  • 原文地址:https://www.cnblogs.com/sicheng-li/p/13205515.html
Copyright © 2011-2022 走看看