zoukankan      html  css  js  c++  java
  • CountDownLatch应用

     
    一、应用场景
     1)主线程开启n个线程后,等待所有子线程执行完成以后,主线程对子线程的数据进行汇总,然后主线程结束即(简单说:主线程等待所有子线程执行完成后,主线程才结束)
     2)2组线程,第一组线程等待第二组线程执行完成以后才执行,即第一组线程wait,第二组线程计数器不断减少
     
    二、使用示例
     
    package com.test.lock;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    /**
    * 主要用途:
    */
    
    
    public class CountDownLatchTest4 {
        static Logger logger = LoggerFactory.getLogger(CountDownLatchTest4.class ) ;
        static final int paralNum=10;
    
    
        public static int getRandom(){
            Random random = new Random() ;
            int rid = random.nextInt(6);
            System.out.println(String.format("生成随机数:%d", rid+2 ));
            return rid+2;
        }
    
    
        public static void main(String[] args) throws InterruptedException {
    
    
    
    
            AtomicInteger tj = new AtomicInteger();
    
    
            CountDownLatch countDownLatch = new CountDownLatch(paralNum);
    //        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    
    
    //        for (int i = 1; i <=2 ; i++) {
    //            new Thread(()->{
    //                try {
    //                    countDownLatch.await();
    //                } catch (InterruptedException e) {
    //                    e.printStackTrace();
    //                }
    //                logger.info(String.format("t %s 执行结束", Thread.currentThread().getName()));
    //
    //            },"t"+(i+20)).start(); ;
    //        }
    
    
            for (int i = 1; i <=paralNum ; i++) {
                  new Thread(()->{
                    try {
                        TimeUnit.SECONDS.sleep(getRandom());
                        tj.incrementAndGet();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
    
                    countDownLatch.countDown();
                  logger.info(String.format("t %s 执行结束", Thread.currentThread().getName()));
    
    
                },"t"+i).start(); ;
            }
            
            System.out.println("主线程结束执行等待");
            countDownLatch.await();
            System.out.println(String.format("主线程结束,汇总值为:%d", tj.get() )) ;
        }
    }
    三、源码主要方法解析
         
                从整体上看:主要是使用AQS框架,初始化的时候给Status一个值,每次countDownLatch.countDown的时候将状态值减一,当status值为0的时候唤醒所有队列上等待的线程。
     
    1) 代码1,初始化对象
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //status状态赋一个整数值
        this.sync = new Sync(count);
    }

     2)方法2,线程等待方法

    //线程等待
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    //
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //
        if (tryAcquireShared(arg) < 0)// 如果状态==0,则不执行下面等待方法,如果status>0 则会执行下面等待方法
                  //线程入队等待
            doAcquireSharedInterruptibly(arg);
    }

     3)方法3,线程阻塞--到这为止,线程就完成了阻塞,当线程阻塞结束,线程会从这个位置继续执行

       

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //当前线程构建一个node,然后放入到队列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取当前节点的前一个节点
                final Node p = node.predecessor();
                 //如果前一个节点是头节点
                if (p == head) {
                    //判断status是否为==0,如果为0则线程不会阻塞,并且会唤醒前面所有等待的线程
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //唤醒前面等待的所有线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //阻塞线程
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

      4)线程状态释放的方法

    public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            //唤醒等待的线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            //获取当前的状态
            int c = getState();
            if (c == 0)
                return false;
            //状态 -1
            int nextc = c-1;
            //状态回写
            if (compareAndSetState(c, nextc))
                //如果状态结果 =0,则返回true( 方法会唤醒所有等待的线程)  ,否则false
                return nextc == 0;
        }
    }
  • 相关阅读:
    初识 visJs (基于html5 canvas开发的可视化框架)
    VueJs
    VueJS 使用i18n做国际化切换中英文
    vue-cli项目接口地址可配置化(多环境部署)一处修改多处适用
    vue + element-ui 制作下拉菜单(可配置路由、可根据路由高亮list、可刷新自动展开定位路由)
    vue-cli -- > 项目基本构建的方法
    javascript代码工具库
    HTML5新功能之六 《Web通信、WebSockets和跨文档消息传输》
    《响应式Web设计:HTML5和CSS3实战》 读书笔记
    HTML5新功能之二 《Geolocation获取地理位置》
  • 原文地址:https://www.cnblogs.com/lean-blog/p/13736057.html
Copyright © 2011-2022 走看看