zoukankan      html  css  js  c++  java
  • 并发编程之CountDownLatch demo与源码

    “ A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.” 这是CountDownLatch这个类要解决的问题:实现一个同步器,让一个或者多个线程一直等待,直到一组在其他线程中执行的操作完成。​

    在之前项目中碰到一个复杂查询,就是需要先分页查询出20条数据,然后根据事件类型对这20条数据分为4类,分别用线程查询这4类的特有信息,然后等所有的线程执行完成之后,在对这20条数据根据事件排序,最后返回给前端。因为是使用的线程查询,所以不知道什么时候会执行完。找了很久找到了方案,就是使用CountDownLatch。

    CountDownLatch和CyclicBarrier都是java.util.concurrent包下面的多线程工具类。今天只讲CountDownLatch,下次再来看CyclicBarrier。

    一、CountDownLatch

    1.CountDownLatch的作用:

    CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。

    2.CountDownLatch的应用场景:一个任务划分成多个任务执行。

    场景1:

    ​ 就拿上面的例子来说吧,线程1,2,3,4执行到栅栏位置的时候被阻塞,需要等待所有的线程都执行都得时候,才能打开栅栏,开始执行后面得排序方法。

    package cn.seven.countdownlatch;
    
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * ClassName:    Demo1
     * Package:    cn.seven.countdownlatch
     * Description: CountdownLatchTest01
     * Datetime:    2020/5/13   20:47
     *
     * @Author: charon
     */
    public class Demo1 {
    
        /**
         * @param args 参数
         */
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(4);
            final CountDownLatch latch = new CountDownLatch(4);
    
            System.out.println("主线程,"+Thread.currentThread().getName()+"执行到这里,分成4个线程执行");
    
            Runnable runnable0 = () -> {
                try {
                    System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(10000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                    // 当前线程调用此方法,则计数减一
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.execute(runnable0);
    
            Runnable runnable1 = () -> {
                try {
                    System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(11000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                    // 当前线程调用此方法,则计数减一
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.execute(runnable1);
    
            Runnable runnable2 = () -> {
                try {
                    System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(12000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                    // 当前线程调用此方法,则计数减一
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.execute(runnable2);
    
            Runnable runnable3 = () -> {
                try {
                    System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(13000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                    // 当前线程调用此方法,则计数减一
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.execute(runnable3);
    
            System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成");
            //阻塞当前线程,直到计数器的值为0
            latch.await();
            System.out.println("主线程"+Thread.currentThread().getName()+"开始执行排序...");
        }
    }
    
    

    场景2:

    ​ 我们都见过跑步比赛,运动员等待裁判员发令枪响,然后运动员起跑,等所有远动员跑到终点了,裁判员就计算名次。

    package cn.seven.countdownlatch;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * ClassName:    Demo2
     * Package:    cn.seven.countdownlatch
     * Description: 模拟运动员比赛,发令枪响运动员开始起跑,等待所有运动员跑完,统计名次
     * Datetime:    2020/5/13   21:14
     *
     * @Author: charon
     */
    public class Demo2 {
    
        /**
         * 执行
         * @param args
         */
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //响枪的栅栏
            final CountDownLatch countDownLatch1 = new CountDownLatch(1);
            //比赛结束的栅栏
            final CountDownLatch countDownLatch2 = new CountDownLatch(3);
    
            for (int i = 0;i< 3;i++){
                Runnable runnable = () -> {
                    try {
                        System.out.println("运动员"+Thread.currentThread().getName()+"等待信号枪");
                        // 跑之前阻塞线程,等到countDownLatch1的count为0开跑
                        countDownLatch1.await();
                        System.out.println("运动员"+Thread.currentThread().getName()+"开跑");
                        Thread.sleep(10);
                        System.out.println("运动员"+Thread.currentThread().getName()+"到达终点");
    
                        countDownLatch2.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
                executorService.execute(runnable);
            }
            try {
                Thread.sleep(5000);
    
                System.out.println("裁判"+Thread.currentThread().getName()+"即将鸣信号枪");
                //递归减一的操作,直到count为0
                countDownLatch1.countDown();
                System.out.println("裁判"+Thread.currentThread().getName()+"鸣响信号枪,等待运动员跑完");
                //等待countDownLatch2 的count减为0,才能继续执行后面的代码
                countDownLatch2.await();
                System.out.println("运动员已经跑到终点,裁判"+Thread.currentThread().getName()+"统计名次");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    
    }
    
    

    3. 下面就来分析一下CountDownLatch的两个重要方法吧!!

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
    	this.sync = new Sync(count);
    }
    
    Sync(int count) {
          setState(count);
    }
    

    这是CountDownLatch的构造器,需要设置一个初始大小,即线程个数。如果count小于0,直接抛出异常。否则就将构造器中的count传递给AQS的state。

    所以CountDownLatch中的countDown()就是对state状态的改变。await()是通过轮询state的状态来判断所有的任务是否都完成。

    countDown源码分析:

    当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用await 方法而被阻塞的线程,否则什么都不做。

    public void countDown() {
         sync.releaseShared(1);//递减锁的技术,如果count为0,就释放锁,如果count大于0,就count减一
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {  //尝试释放锁,这个方法在sync中重写,如果count的值为0,则执行下面的操作
            doReleaseShared();  
            return true;
        }
        return false;
    }
    
    /**
      * CountDownLatch的内部类sync重写的这个尝试释放锁的方法
      */
    protected boolean tryReleaseShared(int releases) {
        //  递减计数;转换为零时的信号
        for (;;) { //使用死循环来尝试释放锁,当前线程成功完成cas使计数值(状态值state)减一并更新到state
            int c = getState();
            if (c == 0) //如果count等于0,则退出,为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有判断,状态值就会变成负数。
                return false;
            int nextc = c-1; //每执行一次,count 减一
            if (compareAndSetState(c, nextc)) //利用cas机制来更新state得状态,调用unsafe.compareAndSwapInt()操作内存,如果当前状态值等于预期值,原子地将同步状态设置为给定的已更新值
                return nextc == 0; // 更新成功就返回
        }
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//指示后续线程需要断开连接
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 循环复查
                    unparkSuccessor(h);//唤醒后续节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    await源码分析:

    当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:

    (1)当所有线程都调用了CountDownLatch对象的countDown方法后,也就是说计时器值为 0 的时候。

    (2)其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//尝试看当前count是否为0,为0则直接返回,否者进入AQS的队列等待
            doAcquireSharedInterruptibly(arg);
    }
    
    /**
     * CountDownLatch的内部类sync重写的这个方法
     */
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;//state不等于0将会返回-1,进入上面那个方法加入AQS队列等待
    }
    
    //AQS等待队列,使用的乐观锁获得共享资源
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);//addWaiter为AQS的加入队尾
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();//获取前一个节点
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //设置队列头,并检查后续进程是否可能在共享模式下等待,如果是这样,则在设置了propagate>0或propagate status时进行传播。
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //检查并修改一个节点的状态,当该节点获取锁失败时。返回true如果线程需要阻塞,并挂起当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)//如果是非正常退出的话,取消获取
                    cancelAcquire(node);
            }
        }
    

    参考网址:

    https://www.cnblogs.com/huangjuncong/p/9275634.html
    http://ifeve.com/countdownlatch源码解析/

  • 相关阅读:
    DEV—【GridControl添加按钮列】
    DEV—【GridControl主从表】
    DEV—【GridControl 按钮列无法触发点击事件解决方案】
    WCF入门大致思路
    .Net应用导入、导出Excel文件
    .Net应用自定义鼠标样式
    VS打包后生成快捷方式:目标指向错误、Icon图标分辨率有误问题解决方案
    在Paint事件中绘制控件(边框)
    Android SDK Manager下载失败后 更新列表的正确设置
    关于sql分页
  • 原文地址:https://www.cnblogs.com/pluto-charon/p/12887902.html
Copyright © 2011-2022 走看看