在实际开发中,碰上CPU密集且执行时间非常耗时的任务,通常我们会选择将该任务进行分割,以多线程方式同时执行若干个子任务,等这些子任务都执行完后再将所得的结果进行合并。这正是著名的map-reduce思想,不过map-reduce通常被用在分布式计算的语境下,这里举这个例子只是为了说明对多线程并发执行流程进行控制的重要性,比如某些线程必须等其他线程执行完后才能开始它的工作。使用jdk中的内置锁或者重入锁配合等待通知机制可以实现这个需求,但是会比较麻烦。因为不管是内置还是重入锁,它们关注的重点在于如何协调多线程对共享资源的访问,而不是协调特定线程的执行次序,完成复杂的并发流程控制。好在JDK在并发包下提供了CountDownLatch,CyclicBarrier,Semaphore等并发工具,可以让我们站在更高的角度思考并解决这个问题。
map-reduce:采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是"任务的分解与结果的汇总"。
一、CountDownLatch(计数器)
CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
构造器:
public CountDownLatch(int count) { }; //参数count为计数值
重要方法:
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public void countDown() { }; //将count值减1
使用示例:
package com.javaBase.LineDistance; import java.util.concurrent.CountDownLatch; /** * 〈一句话功能简述〉; * 〈功能详细描述〉 * * @author jxx * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class TestCountDownLatch { public static void main(String[] args){ CountDownLatch countDownLatch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); System.out.println("t1......."); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); System.out.println("t2......."); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t2.start(); try { countDownLatch.await(); System.out.println("执行完成。。"); } catch (InterruptedException e) { e.printStackTrace(); } } }
二、CyclicBarrier(循环屏障)
它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
构造器:参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
重要方法:
public int await() throws InterruptedException, BrokenBarrierException { }; public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
代码实例:
package com.javaBase.LineDistance; import java.util.concurrent.CyclicBarrier; /** * 〈一句话功能简述〉; * 〈功能详细描述〉 * * @author jxx * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class TestCyclicBarrier { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for (int i = 0; i < N; i++) new Writer(barrier).start(); } static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } }
三、Semaphore(信号量)
Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
构造器:
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
重要方法:
public void acquire() throws InterruptedException { } //获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。 public void acquire(int permits) throws InterruptedException { } //获取permits个许可 public void release() { } //释放一个许可 public void release(int permits) { } //释放permits个许可 public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
代码实例:
package com.javaBase.LineDistance; import java.util.concurrent.Semaphore; /** * 〈一句话功能简述〉; * 〈假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现〉 * * @author jxx * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class TestSemaphore { public static void main(String[] args) { int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for (int i = 0; i < N; i++) new Worker(i, semaphore).start(); } static class Worker extends Thread { private int num; private Semaphore semaphore; public Worker(int num, Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人" + this.num + "占用一个机器在生产..."); Thread.sleep(2000); System.out.println("工人" + this.num + "释放出机器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
四、总结
1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
2)Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。