前面分享了CountDownLatch的用法,但是由于分享过程中,发现有些朋友,问我Future与CountDownLatch的有什么区别?
答案:只是concurrent包下的并发帮助工具类,两者并没有什么联系;对于CountDownLatch是关注与子线程的执行完毕情况,而Future是Callable执行call回调包装的返回值;
Runnable是执行工作的独立任务,但是它不返回任何值,如果希望任务完成时能够返回一个值,那么可以实现Callable接口,而不是实现Runnable接口,在Java 1.5中引入Callable是一种具有类型参数的泛型,它的类型参数,是从call()函数中获取到的,而不是run()方法,并且必须使用ExecutorService.submit()方法调用它;
package demo.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * futurn使用demo * * @author bqcoder * @version $Id: FuturnDemo.java, v 0.1 2016年11月22日 下午9:07:13 bqcoder Exp $ */ public class FutureDemo { public static void main(String[] args) { List<Future<String>> taskResults = new ArrayList<Future<String>>(); //创建线程池,使用future必须要使用executors.submit来调用,《乌龟的屁股,规定》 ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { Future<String> result = executor.submit(new TaskWithResult(i)); taskResults.add(result); } //获取执行结果 for (int i = 0; i < 10; i++) { try { System.out.println(taskResults.get(i).get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } } } } class TaskWithResult implements Callable<String> { private int taskId; TaskWithResult(int taskId) { this.taskId = taskId; } @Override public String call() throws Exception { return "执行结果:任务taskId=" + taskId; } }
运行结果:
执行结果:任务taskId=0 执行结果:任务taskId=1 执行结果:任务taskId=2 执行结果:任务taskId=3 执行结果:任务taskId=4 执行结果:任务taskId=5 执行结果:任务taskId=6 执行结果:任务taskId=7 执行结果:任务taskId=8 执行结果:任务taskId=9
submit()方法会产生Futurn对象,它用Callable返回结果的特定类型进行了参数化,可以使用Future.isDone()来判断Future查询是否完成,如果完成则返回ture,获取值通过get()方法进行获取,如果查询值还没有完成,则进入阻塞状态。
Future模式的核心在于:去除主线程的等待时间,将等待时间可以去处理其他复杂的业务逻辑。
Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的时一直等待到这个答复收到时再去做别的事情,但如果利用Future设计模式就无需等待答复的到来,在等待答复的过程中可以干其他事情。
附源码分析:
public class FutureTask<V> implements RunnableFuture<V> { /** 所有的方法全部委托sync */ private final Sync sync; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); sync = new Sync(callable); } public FutureTask(Runnable runnable, V result) { sync = new Sync(Executors.callable(runnable, result)); } public boolean isCancelled() { return sync.innerIsCancelled(); } public boolean isDone() { return sync.innerIsDone(); } public boolean cancel(boolean mayInterruptIfRunning) { return sync.innerCancel(mayInterruptIfRunning); } public V get() throws InterruptedException, ExecutionException { return sync.innerGet(); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return sync.innerGet(unit.toNanos(timeout)); } protected void done() { } protected void set(V v) { sync.innerSet(v); } protected void setException(Throwable t) { sync.innerSetException(t); } public void run() { sync.innerRun(); } protected boolean runAndReset() { return sync.innerRunAndReset(); } private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** State value representing that task is ready to run */ /** 代表起始状态 */ private static final int READY = 0; /** State value representing that task is running */ /** 代表正在运行中状态 */ private static final int RUNNING = 1; /** State value representing that task ran */ /** 代表运行完成的状态 */ private static final int RAN = 2; /** State value representing that task was cancelled */ /** 代表被取消的状态 */ private static final int CANCELLED = 4; /** The underlying callable */ private final Callable<V> callable; /** The result to return from get() */ private V result; /** The exception to throw from get() */ private Throwable exception; /** * The thread running task. When nulled after set/cancel, this * indicates that the results are accessible. Must be * volatile, to ensure visibility upon completion. */ private volatile Thread runner; Sync(Callable<V> callable) { this.callable = callable; } /** * 判断是否完成或者是否取消 * 传入0或者1 都返回0 说明任务没有完成 也没有取消 */ private boolean ranOrCancelled(int state) { return (state & (RAN | CANCELLED)) != 0; } /** * AbstractQueuedSynchronizer的模板方法 * 返回1可以获取锁 返回-1说明获取锁失败 * 调用innerIsDone 返回TRUE 说明任务已经执行完毕 * 返回FALSE 说明任务没有执行完毕 */ protected int tryAcquireShared(int ignore) { return innerIsDone() ? 1 : -1; } /** * 释放锁 将执行当前任务的线程设置为null */ protected boolean tryReleaseShared(int ignore) { runner = null; return true; } //判断任务是否被取消 boolean innerIsCancelled() { return getState() == CANCELLED; } //判断任务是否完成(取消也算完成) boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } //获取结果 V innerGet() throws InterruptedException, ExecutionException { //首先调用AbstractQueuedSynchronizer的方法,这个方法会调用子类方法tryAcquireShared 上面有讲 //如果当前任务已经完成,那么当前线程可以向下运行,否则把当前线程加入队列阻塞. acquireSharedInterruptibly(0); //判断状态 如果取消了就抛CancellationException异常. if (getState() == CANCELLED) throw new CancellationException(); //如果任务执行过程中出现异常,这里包装一下抛出ExecutionException. if (exception != null) throw new ExecutionException(exception); return result; } //获取结果 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { //调用AbstractQueuedSynchronizer里的方法 // return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout); // 首先tryAcquireShared调用它获取锁,也就是看任务完事没,如果任务完事了就返回TRUE,那么执行逻辑同上。 // 如果获取不到锁,那么就阻塞当前线程给定的时间,如果时间到了再次任务还没完成则抛出异常。 if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } void innerSet(V v) { for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } //正常完成 设置状态为RAN if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); //通知子类 return; } } } void innerSetException(Throwable t) { for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } //设置异常 if (compareAndSetState(s, RAN)) { exception = t; releaseShared(0); done();//通知子类 return; } } } //取消任务 boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); //如果任务已经结束,则返回FALSE if (ranOrCancelled(s)) return false; //设置任务的状态为CANCELLED if (compareAndSetState(s, CANCELLED)) break; } //如果参数mayInterruptIfRunning=TRUE,那么设置线程的终端状态 if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } //释放锁 releaseShared(0); //调用子类方法,通知状态改变 done(); return true; } void innerRun() { //如果任务不是初始状态则直接结束 if (!compareAndSetState(READY, RUNNING)) return; runner = Thread.currentThread(); if (getState() == RUNNING) { // recheck after setting thread V result; try { result = callable.call(); } catch (Throwable ex) { //我们写的任务方法里如果出现异常则调用setException setException(ex); return; } //设置结果 set(result); } else { //释放锁 releaseShared(0); // cancel } } boolean innerRunAndReset() { if (!compareAndSetState(READY, RUNNING)) return false; try { runner = Thread.currentThread(); if (getState() == RUNNING) callable.call(); // don't set result runner = null; return compareAndSetState(RUNNING, READY); } catch (Throwable ex) { setException(ex); return false; } } } }
参考资料:
1、【Thinking in Java】
2、【源码分析】http://blog.csdn.net/kobejayandy/article/details/46293927
3、【Java多线程编程中Future模式的详解】http://www.2cto.com/kf/201411/351903.html