zoukankan      html  css  js  c++  java
  • [置顶] JDK-CountDownLatch-实例、源码和模拟实现

    Conception

    A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
    根据CountDownLatch的document,就是说CountDownLatch作为一个同步的助手,可以阻塞一个线程,等待另一个线程结束了再执行。
    所以CountDownLatch的作用在于:通过阻塞来协调线程间的执行顺序。
    CountDownLatch最重要的方法为await()和countDown()。首先,初始化指定一个count,每次countDown()都会count--,而await()就是阻塞调用它的方法,直到count==0。
    所以可以把CountDownLatch看成同步的计数器,什么时候倒数到0就执行,否则阻塞。

    Demo

    我们首先做一个CountDownLatch的demo,看看它是怎么一个阻塞。
    count初始化为2,所以count.await()会阻塞主线程,直到两个processor都countDown(),即count==0的时候才会执行,打印出"Processors has been done."

    public class CountDownLatchTest {
    	public static void main(String[] args) throws InterruptedException {
    		CountDownLatch count = new CountDownLatch(2);
    		ExecutorService exc = Executors.newSingleThreadExecutor();
    		
    		System.out.println("Before submit the processor.");
    		exc.submit(new CountProcessor(count, "P1"));
    		exc.submit(new CountProcessor(count, "P2"));
    		System.out.println("Has submited the processor.");
    
    		count.await();
    		System.out.println("Processors has been done.");
    		exc.shutdown();
    	}
    }
    
    class CountProcessor extends Thread {
    	private CountDownLatch count;
    	private String name;
    	public CountProcessor(CountDownLatch count, String name) {
    		this.count = count;
    		this.name = name;
    	}
    	@Override
    	public void run() {
    		try {
    			Thread.currentThread().sleep(1000);
    			System.out.println(this.name + " has been done.");
    			count.countDown();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    



    修改一下上面的程序,我们可以模拟一个百米赛跑的现场:
    首先运动员进场:前期数据准备和线程池的初始化。
    准备工作完成后统一起跑:传入的begin.await()一直阻塞着Runner.run(),在前期准备完成后,begin.countDown()后begin.count==0,阻塞结束,Runner起跑。
    比赛一直进行,直到所有选手冲过终点:end.await()会阻塞主线程,所以最后的那条print语句不会提前打印,而是等待Runner.run()结束执行end.countDown(),减少3次直到end.count==0才接触阻塞,宣布比赛结束。
    这就是CountDownLatch的经典场景,统一开始,统一结束。

    public class CountDownLatchTest {
    	public static void main(String[] args) throws InterruptedException {
    		CountDownLatch begin = new CountDownLatch(1);
    		CountDownLatch end = new CountDownLatch(3);
    		ExecutorService exc = Executors.newCachedThreadPool();
    		
    		System.out.println("Runners are comming.");
    		exc.submit(new CountProcessor(begin, end, "Runner_1", 1000));
    		exc.submit(new CountProcessor(begin, end, "Runner_2", 2000));
    exc.submit(new CountProcessor(begin, end, "Runner_3", 3000));
    		System.out.println("Ready.");
    		
    		Thread.currentThread().sleep(1000);
    		System.out.println("Go.");
    		begin.countDown();
    		
    		end.await();
    		System.out.println("All runners Finish the match.");
    		exc.shutdown();
    	}
    }
    
    class CountProcessor extends Thread {
    	private CountDownLatch beginCount;
    	private CountDownLatch endCount;
    	private String name;
    	private int runningTime;
    	public CountProcessor(CountDownLatch beginCount, CountDownLatch endCount, String name, int runningTime) {
    		this.beginCount = beginCount;
    		this.endCount = endCount;
    		this.name = name;
    		this.runningTime = runningTime;
    	}
    
    	@Override
    	public void run() {
    		try {
    			this.beginCount.await();
    			System.out.println(this.name + " start.");
    			Thread.currentThread().sleep(this.runningTime);
    			System.out.println(this.name + " breast the tape.");
    			this.endCount.countDown();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    


    Source Code

    我们来看JDK中CountDownLatch是怎么定义的,以及其中的主要方法。

    public class CountDownLatch {
    public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
    }
    
    public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
    }
    
    public void countDown() {
            sync.releaseShared(1);
    }
    
    public long getCount() {
            return sync.getCount();
    }
    }
    



    很显然,CountDownLatch的逻辑都封装在内部类Sync中,这也是通过装饰者模式来提高封装性的普遍做法。
    Sync继承了AbstractQueuedSynchronizer,这里的调用关系有点麻烦,我就不一一展开了,我贴了部分关键的代码(如下)。
    概括一下,CountDownLatch的count存放在以volatile声明的变量state。在await()的时候for轮询state,一直轮询以达到阻塞的效果,直到state==0。而countDown()就是通过CompareAndSet的方式来递减state。

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    }
    
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    private volatile int state;
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
    }
    }
    


    Simulator

    从上面可以看出来,其实CountDownLatch的实现很简单,countDown的时候原子修改,await的时候循环判断volatile来阻塞。
    所以完全可以通过原子类来实现,而且还更加beautiful。以下,我用AtomicInteger来模拟CountDownLatch的实现。
    测试程序还是和上面的赛跑程序一样的,只是把CountDownLatch替换成了CountDownLatchSimulator。
    最后测试结果还是一样的,只是过程简单了许多,因为JDK把很多防止冲突的逻辑都封装在原子类了。其中await的时候,可以在循环中加上sleep,能减低系统轮询的消耗。
    我只是做了一个简单的实现,至于性能和安全性方面,还希望大神们指导大笑

    public class CountDownLatchSimulator {
    	private AtomicInteger count;
    	public CountDownLatchSimulator(int count) {
    		this.count = new AtomicInteger(count);
    	}
    	
    	public void await() throws InterruptedException {
    		while (this.count.get() != 0) {
    //			Thread.currentThread().sleep(100);
    		}
    	}
    	
    	public void countDown() {
    		this.count.getAndDecrement();
    	}
    	
    	public static void main(String[] args) throws InterruptedException {
    		CountDownLatchSimulator begin = new CountDownLatchSimulator(1);
    		CountDownLatchSimulator end = new CountDownLatchSimulator(3);
    		ExecutorService exc = Executors.newCachedThreadPool();
    		
    		System.out.println("Runners are comming.");
    		exc.submit(new CountProcessor2(begin, end, "Runner_1", 1000));
    		exc.submit(new CountProcessor2(begin, end, "Runner_2", 2000));
    		exc.submit(new CountProcessor2(begin, end, "Runner_3", 3000));
    		System.out.println("Ready.");
    		
    		Thread.currentThread().sleep(2000);
    		System.out.println("Go.");
    		begin.countDown();
    		
    		end.await();
    		System.out.println("All runners Finish the match.");
    		exc.shutdown();
    	}
    }
    
    class CountProcessor2 extends Thread {
    	
    	private CountDownLatchSimulator beginCount;
    	private CountDownLatchSimulator endCount;
    	private String name;
    	private int runningTime;
    	public CountProcessor2(CountDownLatchSimulator beginCount, CountDownLatchSimulator endCount, String name, int runningTime) {
    		this.beginCount = beginCount;
    		this.endCount = endCount;
    		this.name = name;
    		this.runningTime = runningTime;
    	}
    	
    	@Override
    	public void run() {
    		try {
    			this.beginCount.await();
    			System.out.println(this.name + " start.");
    			Thread.currentThread().sleep(this.runningTime);
    			System.out.println(this.name + " breast the tape.");
    			this.endCount.countDown();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
  • 相关阅读:
    洛谷 P2260 [清华集训2012]模积和 || bzoj2956
    Mass Change Queries Codeforces
    Single-use Stones Codeforces
    洛谷 P4503 [CTSC2014]企鹅QQ
    洛谷 P1463 [HAOI2007]反素数
    Bear and Tower of Cubes Codeforces
    洛谷 P1593 因子和 || Sumdiv POJ
    记一次服务器inodes数报警的事件
    MySQL参数详解
    Django基础流程
  • 原文地址:https://www.cnblogs.com/pangblog/p/3323187.html
Copyright © 2011-2022 走看看