zoukankan      html  css  js  c++  java
  • CountDownLatch应用

     
    一、应用场景
     1)主线程开启n个线程后,等待所有子线程执行完成以后,主线程对子线程的数据进行汇总,然后主线程结束即(简单说:主线程等待所有子线程执行完成后,主线程才结束)
     2)2组线程,第一组线程等待第二组线程执行完成以后才执行,即第一组线程wait,第二组线程计数器不断减少
     
    二、使用示例
     
    package com.test.lock;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    /**
    * 主要用途:
    */
    
    
    public class CountDownLatchTest4 {
        static Logger logger = LoggerFactory.getLogger(CountDownLatchTest4.class ) ;
        static final int paralNum=10;
    
    
        public static int getRandom(){
            Random random = new Random() ;
            int rid = random.nextInt(6);
            System.out.println(String.format("生成随机数:%d", rid+2 ));
            return rid+2;
        }
    
    
        public static void main(String[] args) throws InterruptedException {
    
    
    
    
            AtomicInteger tj = new AtomicInteger();
    
    
            CountDownLatch countDownLatch = new CountDownLatch(paralNum);
    //        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    
    
    //        for (int i = 1; i <=2 ; i++) {
    //            new Thread(()->{
    //                try {
    //                    countDownLatch.await();
    //                } catch (InterruptedException e) {
    //                    e.printStackTrace();
    //                }
    //                logger.info(String.format("t %s 执行结束", Thread.currentThread().getName()));
    //
    //            },"t"+(i+20)).start(); ;
    //        }
    
    
            for (int i = 1; i <=paralNum ; i++) {
                  new Thread(()->{
                    try {
                        TimeUnit.SECONDS.sleep(getRandom());
                        tj.incrementAndGet();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
    
                    countDownLatch.countDown();
                  logger.info(String.format("t %s 执行结束", Thread.currentThread().getName()));
    
    
                },"t"+i).start(); ;
            }
            
            System.out.println("主线程结束执行等待");
            countDownLatch.await();
            System.out.println(String.format("主线程结束,汇总值为:%d", tj.get() )) ;
        }
    }
    三、源码主要方法解析
         
                从整体上看:主要是使用AQS框架,初始化的时候给Status一个值,每次countDownLatch.countDown的时候将状态值减一,当status值为0的时候唤醒所有队列上等待的线程。
     
    1) 代码1,初始化对象
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //status状态赋一个整数值
        this.sync = new Sync(count);
    }

     2)方法2,线程等待方法

    //线程等待
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    //
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //
        if (tryAcquireShared(arg) < 0)// 如果状态==0,则不执行下面等待方法,如果status>0 则会执行下面等待方法
                  //线程入队等待
            doAcquireSharedInterruptibly(arg);
    }

     3)方法3,线程阻塞--到这为止,线程就完成了阻塞,当线程阻塞结束,线程会从这个位置继续执行

       

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //当前线程构建一个node,然后放入到队列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取当前节点的前一个节点
                final Node p = node.predecessor();
                 //如果前一个节点是头节点
                if (p == head) {
                    //判断status是否为==0,如果为0则线程不会阻塞,并且会唤醒前面所有等待的线程
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //唤醒前面等待的所有线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //阻塞线程
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

      4)线程状态释放的方法

    public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            //唤醒等待的线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            //获取当前的状态
            int c = getState();
            if (c == 0)
                return false;
            //状态 -1
            int nextc = c-1;
            //状态回写
            if (compareAndSetState(c, nextc))
                //如果状态结果 =0,则返回true( 方法会唤醒所有等待的线程)  ,否则false
                return nextc == 0;
        }
    }
  • 相关阅读:
    122. Best Time to Buy and Sell Stock II
    121. Best Time to Buy and Sell Stock
    72. Edit Distance
    583. Delete Operation for Two Strings
    582. Kill Process
    indexDB基本用法
    浏览器的渲染原理
    js实现txt/excel文件下载
    git 常用命令
    nginx进入 配置目录时
  • 原文地址:https://www.cnblogs.com/lean-blog/p/13736057.html
Copyright © 2011-2022 走看看