zoukankan      html  css  js  c++  java
  • 多线程学习笔记六之并发工具类CountDownLatch和CyclicBarrier

    简介

      在编写多线程程序时,难免需要对并发流程进行控制,Thread类有join()和yield()等方法,JUC提供了更为灵活的并发工具类,下面就学习这些工具类的用法以及实现。

    CountDownLatch

      latch意思是门闩,countdown指从上往下数,CountDownLatch允许一个或多个线程等待其他任务线程完成操作,就像它的字面意思:从大往小数,数到某个值(0)的时候打开门闩。下面是CountDownLatch的api:

    	//构造器
        public CountDownLatch(int count);
    
    	//调用await()方法的线程会进入等待状态,它会等待直到count值为0才继续执行
        public void await();  
    	//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
        public boolean await(long timeout, TimeUnit unit);
    	//计数器减一
        public void countDown()
    

    可以看到通过构造器构造一个计数器,通过调用countDown方法计数减小,await在计数器大于0时线程处于等待状态,通过下面例子可以学会CountDownLatch的用法:

    示例

    public class LatchTest {
        public static void main(String[] args) {
    		//两个线程,计数器传入2
            final CountDownLatch latch = new CountDownLatch(2);
    
    		//这两个线程执行了latch.countDown(),计数器归0,主线程才被唤醒继续执行
            new Thread(() -> {
                try {
                    System.out.println("子线程1: "+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程1: "+Thread.currentThread().getName()+"执行完毕");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(() -> {
                try {
                    System.out.println("子线程2: "+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程2: "+Thread.currentThread().getName()+"执行完毕");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            try {
                System.out.println("等待2个子线程执行完毕...");
                latch.await();
                System.out.println("2个子线程已经执行完毕");
                System.out.println("继续执行主线程");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    运行结果:

    实现分析

      CountDownLatch是基于共享锁实现的,内部类Sync继承同步器AQS,重点分析CountDownLatch以下三个方法:

    构造方法

      通过构造函数传入的参数count设置同步状态(count必须大于0,否则抛出异常),同步状态在这里并不表示线程获得锁的重入次数,而是表示一个计数器,计数器的大小与任务线程的数目是一致的,

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        Sync(int count) {
            setState(count);
        }
    

    await()

      调用了await的线程会处于等待状态,直到计数器归0才会被唤醒。await方法调用了Sync父类AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly首先检查线程有中断,然后调用tryAcquireShared尝试获取共享锁,获取成功返回1,失败返回-1,若失败调用doAcquireSharedInterruptibly将当前线程加入同步队列阻塞住,等待计数器为0唤醒。

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    

    countDown()

      countDown方法将计数器减一,调用了AQS的releaseShared方法,当tryReleaseShared方法返回true执行doReleaseShared方法,这个方法在分析读写锁是介绍过了,就是唤醒同步等列等待获取锁的线程,即唤醒调用了await方法等待计数器归0的线程。

        public void countDown() {
            sync.releaseShared(1);
        }
    
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    • tryReleaseShared(int releases)
        通过循环+CAS的方式修改同步状态state,当同步状态为0时返回true;同步状态为0,即表示计数器归0,所有调用了countDown的线程都执行完了,可以唤醒调用await等待的线程了。
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    

    CountDownLatch与Thread.join()

      Thread类的join方法与CountDownLatch作用类似,join方法的实现原理不停检查调用join的线程是否存活,如果存活则让当前线程处于等待状态,当join线程终止后,会唤醒当前线程。CountDownLatch与join相比更灵活,不必非得线程中止只要调用了countDown方法就行了,可以响应中断以及能够设置超时等功能。

    CyclicBarrier

      CyclicBarrier是指可循环使用的屏障,它可以让一组线程当他们分别达到了同步点(common barrier point)时被阻塞,直到最后一个线程到达了同步点,屏障才会开门,让所有被屏障屏蔽的线程继续运行。

    public class BarrierTest {
        public static void main(String[] args) {
            int size = 4;
            CyclicBarrier barrier  = new CyclicBarrier(size);
            for(int i=0;i<size;i++)
                new Writer(barrier).start();
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+" is coming...");
                try {
                    //睡眠模拟业务操作
                    Thread.sleep(5000);      
                    System.out.println("线程"+Thread.currentThread().getName()+" is waiting on barrier");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }
    }
    

    运行结果:

    实现分析

    类属性及构造方法

    public class CyclicBarrier {
        
    	//CyclicBarrier使用完了可以重置,每使用一次都会有一个新的Generation对象,broken表示当前屏障是否被损坏
        private static class Generation {
            boolean broken = false;
        }
    
        //重入锁
        private final ReentrantLock lock = new ReentrantLock();
        //condition实现线程等待与唤醒
        private final Condition trip = lock.newCondition();
        //表示线程数,在parties个线程都调用await方法后,barrier才算是被通过(tripped)了。
        private final int parties;
        //通过构造方法设置一个Runnable对象,用来在所有线程都到达barrier时执行。
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();
    
        //count表示还剩下未到达barrier(未调用await)的线程数量
        private int count;
    
    	//构造函数
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    

    await()

      await重载的两种方法都是调用的doWait方法。

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
    • doWait(boolean timed, long nanos)
        doWait是await的核心方法,通过独占锁和Condition对象让线程阻塞等待,具体先判断当前线程是不是最后一个执行await方法的线程,如果不是,调用condition的await方法让线程等待,在这里我们看到首先线程会获得锁,进入同步块,在循环里让线程等待,这里因为当前线程获得了独占锁,它处于同步队列的head头节点之中,当调用了condition.await()方法后,当前线程从同步队列转移到条件队列,释放了独占锁,所以当前线程获取独占锁并不会影响后来的线程获取独占锁,因为当前线程进入阻塞状态已经释放了独占锁,直到被唤醒后才会去争取获得独占锁,到最后会在finally块中显示的释放。
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
    		//独占锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
    			//Generation对象
                final Generation g = generation;
    
    			//屏障被破坏,抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
    
    			//线程被中断
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                int index = --count;
    			//最后一个到达同步点的线程
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
    			//一直循环直到最后一个线程到达同步点、屏障破损(genneration的broken属性为true)、中断或超时
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
    					//g == generation && !g.broken说明此时当前这一轮还没结束,并且没有其它线程执行过
    					//breakBarrier方法。这种情况会执行breakBarrier置generation的broken标识为true并
    					//唤醒其它线程,之后继续抛出InterruptedException。 
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // 如果g != generation,此时这一轮已经结束,后面返回index作为到达barrier的次序;
                            // 如果g.broken说明之前已经有其它线程执行了breakBarrier方法,后面会抛出
    						//BrokenBarrierException。
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
    				//超时
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    
    • breakBarrier()
        损坏当前屏障,会唤醒所有在屏障中的线程,当线程被中断或等待超时会调用
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
    
    • nextGeneration()
        nextGeneration方法在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
    

    CountDownLatch和CyclicBarrier区别

      从功能上说,CountDownLatch允许一个或多个线程等待其他线程完成操作,而CyclicBarrier是让一组线程达到一个公共同步点之后再一起放行;CountDownLatch计数器只能使用一次,CyclicBarrier可以使用reset方法重置用以处理某些复杂的业务场景。

  • 相关阅读:
    ProtoType Design Tools
    什么是publickeytoken及publickeytoken的作用
    Windows最高权限system帐户
    Q70 AV01本本安装MAC
    解决MDict3在PPC下乱码的问题
    QT_XSP.CPP
    QT_XSP.CPP
    SetupFactory脚本
    DVD IFO FILE HEADER
    SerialPort comstat is being used without defining
  • 原文地址:https://www.cnblogs.com/rain4j/p/10183118.html
Copyright © 2011-2022 走看看