zoukankan      html  css  js  c++  java
  • 并发工具类(一)等待多线程的CountDownLatch

    前言

      JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
      CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。

    简介

      CountDownLatch 允许一个或多个线程等待其他线程完成操作。单词Latch的意思是“门闩”,所以没有打开时,N个人是不能进入屋内的,也就是N个线程是不能往下执行的,从而控制线程执行任务的时机,使线程以“组团”的方式一起执行任务。
      CountDownLatch 类 在创建时,给定一个计数count。线程调用CountDownLatch 对象的awiat( )方法时,判断这个计数count是否为0,如果不为0,就进入等待状态。其他线程在完成一定任务时,调用CountDownLatch 的countDown()方法,使计数count减一。直到count的值等于0或者少于0时,便是等待线程的运行时机,将会继续往下运行。

    CountDownLatch的API接口

    方法名称 描 述
    void await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
    boolean await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
    void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
    long getCount() 返回当前计数。
    String toString() 返回标识此锁存器及其状态的字符串。

    注意: await()也可以被多个线程同时调用,从而实现多个线程 等待其他的多个线程完成某部分操作。

    下面是API文档介绍的两个经典用法:

    @ Example1

    Driver类中创建了一组worker 线程,所有的worker线程必须等待Driver类完成初始化动作,才能往下运行。完成初始化动作后,Driver类也必须等待所有worker线程完成才能结束。本例子中使用了两个CountDownLatch类:

    • startSignal是一个启动信号,在 driver 为继续执行 worker 做好准备之前,它会阻止所有的 worker 继续执行。
    • doneSignal是一个完成信号,它允许 driver 在完成所有 worker 之前一直等待。
    class Driver { // ...
       void main() throws InterruptedException {
         CountDownLatch startSignal = new CountDownLatch(1);
         CountDownLatch doneSignal = new CountDownLatch(N);
    
         for (int i = 0; i < N; ++i) // create and start threads
           new Thread(new Worker(startSignal, doneSignal)).start();
    
         doSomethingElse();            // don't let run yet
         startSignal.countDown();      // let all threads proceed
         doSomethingElse();
         doneSignal.await();           // wait for all to finish
       }
     }
    
     class Worker implements Runnable {
       private final CountDownLatch startSignal;
       private final CountDownLatch doneSignal;
       Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
          this.startSignal = startSignal;
          this.doneSignal = doneSignal;
       }
       public void run() {
          try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
    } catch (InterruptedException ex) {} // return;
       }
    
       void doWork() { ... }
     }
    
    

    @ Example2:另一种典型用法是,将一个问题分成 N 个部分(N个小任务),然后将这些任务Runnable交由线程池来完成,每个子任务执行完成,就计数一次,主线程则等待这些子任务完成。当所有的子部分完成后,主线程就能够通过 await。(当线程必须用这种方法反复倒计数时,可改为使用 CyclicBarrier。)

    class Driver2 { // ...
       void main() throws InterruptedException {
         CountDownLatch doneSignal = new CountDownLatch(N);
         Executor e = ...
    
         for (int i = 0; i < N; ++i) // create and start threads
           e.execute(new WorkerRunnable(doneSignal, i));
    
         doneSignal.await();           // wait for all to finish
       }
     }
    
     class WorkerRunnable implements Runnable {
       private final CountDownLatch doneSignal;
       private final int i;
       WorkerRunnable(CountDownLatch doneSignal, int i) {
          this.doneSignal = doneSignal;
          this.i = i;
       }
       public void run() {
          try {
            doWork(i);
            doneSignal.countDown();
          } catch (InterruptedException ex) {} // return;
       }
    
       void doWork() { ... }
     }
    

    下面的内容是 转载自并发编程网 – ifeve.com,文章地址:

    并发工具类(一)等待多线程完成的CountDownLatch

    应用场景

      假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join。代码如下:

    public class JoinCountDownLatchTest {
    
    	public static void main(String[] args) throws InterruptedException {
    		Thread parser1 = new Thread(new Runnable() {
    			@Override
    			public void run() {
    			}
    		});
    
    		Thread parser2 = new Thread(new Runnable() {
    			@Override
    			public void run() {
    				System.out.println("parser2 finish");
    			}
    		});
    
    		parser1.start();
    		parser2.start();
    		parser1.join();
    		parser2.join();
    		System.out.println("all parser finish");
    	}
    
    }
    

      join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远wait,代码片段如下,wait(0)表示永远等待下去。

    while (isAlive()) {
     wait(0);
    }
    

      直到join线程中止后,线程的this.notifyAll会被调用,调用notifyAll是在JVM里实现的,所以JDK里看不到,有兴趣的同学可以看看JVM源码。JDK不推荐在线程实例上使用wait,notify和notifyAll方法。


      而在JDK1.5之后的并发包中提供的CountDownLatch也可以实现join的这个功能,并且比join的功能更多。
    <pre>public class CountDownLatchTest {
    
    	static CountDownLatch c = new CountDownLatch(2);
    
    	public static void main(String[] args) throws InterruptedException {
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				System.out.println(1);
    				c.countDown();
    				System.out.println(2);
    				c.countDown();
    			}
    		}).start();
    
    		c.await();
    		System.out.println("3");
    	}
    
    }
    

    CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

    当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,你只需要把这个CountDownLatch的引用传递到线程里。

    其他方法:

    如果有某个解析sheet的线程处理的比较慢,我们不可能让主线程一直等待,所以我们可以使用另外一个带指定时间的await方法,await(long time, TimeUnit unit): 这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。

    注意:

    • 计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。
    • 一个线程调用countDown方法 happen-before 另外一个线程调用await方法。

    CountDownLatch 的源码分析

    最后,我们简单看一下 CountDownLatch是怎么实现的:

    public class CountDownLatch {
     private final Sync sync;
    
     public CountDownLatch(int count) {//构造器
             //count少于0将抛出异常
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
    public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
     public void countDown() {
            sync.releaseShared(1);
        }
    //........
    }
    

    在创建countDownLatch,其构造器里面创建了一个sync类,并且await()countDown方法都是都是通过此类来实现的。

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
              //设置state的值为countDownLatch的计数的数目
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            //如果state值为0.也就是计数完成了,就不可以再获取共享锁,这也是为什么CountLatch只能用一次
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
           
           //是否可以释放共享锁
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1; //状态state减一
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;//计数到0了,表示释放锁成功。
                }
            }
        }
    

    与大部分的并发工具类一样,都是继承使用了JDK提供的强大的AQS框架类AbstractQueuedSynchronizer,而且使用的还是共享锁,共享锁能允许线程进入的线程数目,就是CountDownLatch传入的参数。

  • 相关阅读:
    每日构建(三)
    asp.net mvc(九)
    表达式树对性能的影响
    asp.net mvc(八)
    31天重构指南之六:降低字段
    使用OPENROWSET将数据从excel导入到sql server
    31天重构指南之三: 提升方法(pull up )
    31天重构指南之一:封装集合
    31天重构指南之七:重命名
    职场杂谈之由仲秋福利想到的
  • 原文地址:https://www.cnblogs.com/jinggod/p/8492067.html
Copyright © 2011-2022 走看看