zoukankan      html  css  js  c++  java
  • Java并发编程之异步Future机制的原理和实现

    项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:

    Java代码  收藏代码
    1. import java.util.concurrent.Callable;  
    2. import java.util.concurrent.ExecutionException;  
    3. import java.util.concurrent.ExecutorService;  
    4. import java.util.concurrent.Executors;  
    5. import java.util.concurrent.Future;  
    6.   
    7. public class AddTask implements Callable<Integer> {  
    8.   
    9.     private int a,b;  
    10.       
    11.     public AddTask(int a, int b) {  
    12.         this.a = a;  
    13.         this.b = b;  
    14.     }  
    15.       
    16.     @Override  
    17.     public Integer call() throws Exception {  
    18.         Integer result = a + b;  
    19.         return result;  
    20.     }  
    21.       
    22.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
    23.         ExecutorService executor = Executors.newSingleThreadExecutor();  
    24.         //JDK目前为止返回的都是FutureTask的实例  
    25.         Future<Integer> future = executor.submit(new AddTask(1, 2));  
    26.         Integer result = future.get();// 只有当future的状态是已完成时(future.isDone() = true),get()方法才会返回  
    27.     }  
    28. }  

      虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:

    Java代码  收藏代码
    1. public interface Future<V> {  
    2.     boolean cancel(boolean mayInterruptIfRunning);  
    3.     boolean isCancelled();  
    4.     boolean isDone();  
    5.     V get() throws InterruptedException, ExecutionException;  
    6.     V get(long timeout, TimeUnit unit)  
    7.         throws InterruptedException, ExecutionException, TimeoutException;  
    8. }  

     由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:

     

    Java代码  收藏代码
    1. package future;  
    2.   
    3. import java.util.concurrent.CancellationException;  
    4. import java.util.concurrent.Future;  
    5. import java.util.concurrent.TimeUnit;  
    6.   
    7. /** 
    8.  * The result of an asynchronous operation. 
    9.  *  
    10.  * @author lixiaohui 
    11.  * @param <V> 执行结果的类型参数 
    12.  */  
    13. public interface IFuture<V> extends Future<V> {   
    14.     boolean isSuccess(); // 是否成功      
    15.     V getNow(); //立即返回结果(不管Future是否处于完成状态)  
    16.     Throwable cause();  //若执行失败时的原因  
    17.         boolean isCancellable(); //是否可以取消  
    18.     IFuture<V> await() throws InterruptedException; //等待future的完成  
    19.     boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成  
    20.     boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;  
    21.         IFuture<V> awaitUninterruptibly(); //<span style="line-height: 1.5;">等待future的完成,不响应中断</span>  
    22.         boolean awaitUninterruptibly(long timeoutMillis);<span style="line-height: 1.5;">//超时</span><span style="line-height: 1.5;">等待future的完成,不响应中断</span>  
    23.     boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);  
    24.     IFuture<V> addListener(IFutureListener<V> l); //当future完成时,会通知这些加进来的监听器  
    25.     IFuture<V> removeListener(IFutureListener<V> l);  
    26.       
    27. }  

     

     

    接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:

    Java代码  收藏代码
    1. public class Object {  
    2.     /** 
    3.      * Causes the current thread to wait until another thread invokes the 
    4.      * {@link java.lang.Object#notify()} method or the 
    5.      * {@link java.lang.Object#notifyAll()} method for this object. 
    6.      * In other words, this method behaves exactly as if it simply 
    7.      * performs the call {@code wait(0)}. 
    8.      * 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify()/notifyAll() 
    9.      */  
    10.     public final void wait() throws InterruptedException {  
    11.         wait(0);  
    12.     }  
    13.   
    14.     /** 
    15.      * Wakes up all threads that are waiting on this object's monitor. A 
    16.      * thread waits on an object's monitor by calling one of the 
    17.      * {@code wait} methods. 
    18.      * <p> 
    19.      * The awakened threads will not be able to proceed until the current 
    20.      * thread relinquishes the lock on this object. The awakened threads 
    21.      * will compete in the usual manner with any other threads that might 
    22.      * be actively competing to synchronize on this object; for example, 
    23.      * the awakened threads enjoy no reliable privilege or disadvantage in 
    24.      * being the next thread to lock this object. 
    25.      */  
    26.     public final native void notifyAll();  
    27. }  

     知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):

    Java代码  收藏代码
    1. package future;  
    2.   
    3. import java.util.Collection;  
    4. import java.util.concurrent.CancellationException;  
    5. import java.util.concurrent.CopyOnWriteArrayList;  
    6. import java.util.concurrent.ExecutionException;  
    7. import java.util.concurrent.TimeUnit;  
    8. import java.util.concurrent.TimeoutException;  
    9.   
    10. /** 
    11.  * <pre> 
    12.  * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL} 
    13.  * 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例 
    14.  * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll()方法: 
    15.  * <ul> 
    16.  * <li>异步操作被取消时(cancel方法)</li> 
    17.  * <li>异步操作正常结束时(setSuccess方法)</li> 
    18.  * <li>异步操作异常结束时(setFailure方法)</li> 
    19.  * </ul> 
    20.  * </pre> 
    21.  *  
    22.  * @author lixiaohui 
    23.  * 
    24.  * @param <V> 
    25.  *            异步执行结果的类型 
    26.  */  
    27. public class AbstractFuture<V> implements IFuture<V> {  
    28.   
    29.     protected volatile Object result; // 需要保证其可见性  
    30.         /** 
    31.          * 监听器集 
    32.          */  
    33.     protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>();  
    34.   
    35.     /** 
    36.      * 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时,  
    37.      * result引用该对象 
    38.      */  
    39.     private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal();  
    40.   
    41.     @Override  
    42.     public boolean cancel(boolean mayInterruptIfRunning) {  
    43.         if (isDone()) { // 已完成了不能取消  
    44.             return false;  
    45.         }  
    46.   
    47.         synchronized (this) {  
    48.             if (isDone()) { // double check  
    49.                 return false;  
    50.             }  
    51.             result = new CauseHolder(new CancellationException());  
    52.             notifyAll(); // isDone = true, 通知等待在该对象的wait()的线程  
    53.         }  
    54.         notifyListeners(); // 通知监听器该异步操作已完成  
    55.         return true;  
    56.     }  
    57.       
    58.     @Override  
    59.     public boolean isCancellable() {  
    60.         return result == null;  
    61.     }  
    62.       
    63.     @Override  
    64.     public boolean isCancelled() {  
    65.         return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;  
    66.     }  
    67.   
    68.     @Override  
    69.     public boolean isDone() {  
    70.         return result != null;  
    71.     }  
    72.   
    73.     @Override  
    74.     public V get() throws InterruptedException, ExecutionException {  
    75.         await(); // 等待执行结果  
    76.   
    77.         Throwable cause = cause();  
    78.         if (cause == null) { // 没有发生异常,异步操作正常结束  
    79.             return getNow();  
    80.         }  
    81.         if (cause instanceof CancellationException) { // 异步操作被取消了  
    82.             throw (CancellationException) cause;  
    83.         }  
    84.         throw new ExecutionException(cause); // 其他异常  
    85.     }  
    86.   
    87.     @Override  
    88.     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {  
    89.         if (await(timeout, unit)) {// 超时等待执行结果  
    90.             Throwable cause = cause();  
    91.             if (cause == null) {// 没有发生异常,异步操作正常结束  
    92.                 return getNow();  
    93.             }  
    94.             if (cause instanceof CancellationException) {// 异步操作被取消了  
    95.                 throw (CancellationException) cause;  
    96.             }  
    97.             throw new ExecutionException(cause);// 其他异常  
    98.         }  
    99.         // 时间到了异步操作还没有结束, 抛出超时异常  
    100.         throw new TimeoutException();  
    101.     }  
    102.   
    103.     @Override  
    104.     public boolean isSuccess() {  
    105.         return result == null ? false : !(result instanceof CauseHolder);  
    106.     }  
    107.   
    108.     @SuppressWarnings("unchecked")  
    109.     @Override  
    110.     public V getNow() {  
    111.         return (V) (result == SUCCESS_SIGNAL ? null : result);  
    112.     }  
    113.   
    114.     @Override  
    115.     public Throwable cause() {  
    116.         if (result != null && result instanceof CauseHolder) {  
    117.             return ((CauseHolder) result).cause;  
    118.         }  
    119.         return null;  
    120.     }  
    121.   
    122.     @Override  
    123.     public IFuture<V> addListener(IFutureListener<V> listener) {  
    124.         if (listener == null) {  
    125.             throw new NullPointerException("listener");  
    126.         }  
    127.         if (isDone()) { // 若已完成直接通知该监听器  
    128.             notifyListener(listener);  
    129.             return this;  
    130.         }  
    131.         synchronized (this) {  
    132.             if (!isDone()) {  
    133.                 listeners.add(listener);  
    134.                 return this;  
    135.             }  
    136.         }  
    137.         notifyListener(listener);  
    138.         return this;  
    139.     }  
    140.   
    141.     @Override  
    142.     public IFuture<V> removeListener(IFutureListener<V> listener) {  
    143.         if (listener == null) {  
    144.             throw new NullPointerException("listener");  
    145.         }  
    146.   
    147.         if (!isDone()) {  
    148.             listeners.remove(listener);  
    149.         }  
    150.   
    151.         return this;  
    152.     }  
    153.   
    154.     @Override  
    155.     public IFuture<V> await() throws InterruptedException {  
    156.         return await0(true);  
    157.     }  
    158.   
    159.       
    160.     private IFuture<V> await0(boolean interruptable) throws InterruptedException {  
    161.         if (!isDone()) { // 若已完成就直接返回了  
    162.             // 若允许终端且被中断了则抛出中断异常  
    163.             if (interruptable && Thread.interrupted()) {  
    164.                 throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted.");  
    165.             }  
    166.   
    167.             boolean interrupted = false;  
    168.             synchronized (this) {  
    169.                 while (!isDone()) {  
    170.                     try {  
    171.                         wait(); // 释放锁进入waiting状态,等待其它线程调用本对象的notify()/notifyAll()方法  
    172.                     } catch (InterruptedException e) {  
    173.                         if (interruptable) {  
    174.                             throw e;  
    175.                         } else {  
    176.                             interrupted = true;  
    177.                         }  
    178.                     }  
    179.                 }  
    180.             }  
    181.             if (interrupted) {  
    182.                 // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的,   
    183.                 // 这里重新设置以便让其它代码知道这里被中断了。  
    184.                 Thread.currentThread().interrupt();  
    185.             }  
    186.         }  
    187.         return this;  
    188.     }  
    189.       
    190.     @Override  
    191.     public boolean await(long timeoutMillis) throws InterruptedException {  
    192.         return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);  
    193.     }  
    194.       
    195.     @Override  
    196.     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {  
    197.         return await0(unit.toNanos(timeout), true);  
    198.     }  
    199.   
    200.     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {  
    201.         if (isDone()) {  
    202.             return true;  
    203.         }  
    204.   
    205.         if (timeoutNanos <= 0) {  
    206.             return isDone();  
    207.         }  
    208.   
    209.         if (interruptable && Thread.interrupted()) {  
    210.             throw new InterruptedException(toString());  
    211.         }  
    212.   
    213.         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();  
    214.         long waitTime = timeoutNanos;  
    215.         boolean interrupted = false;  
    216.   
    217.         try {  
    218.             synchronized (this) {  
    219.                 if (isDone()) {  
    220.                     return true;  
    221.                 }  
    222.   
    223.                 if (waitTime <= 0) {  
    224.                     return isDone();  
    225.                 }  
    226.   
    227.                 for (;;) {  
    228.                     try {  
    229.                         wait(waitTime / 1000000, (int) (waitTime % 1000000));  
    230.                     } catch (InterruptedException e) {  
    231.                         if (interruptable) {  
    232.                             throw e;  
    233.                         } else {  
    234.                             interrupted = true;  
    235.                         }  
    236.                     }  
    237.   
    238.                     if (isDone()) {  
    239.                         return true;  
    240.                     } else {  
    241.                         waitTime = timeoutNanos - (System.nanoTime() - startTime);  
    242.                         if (waitTime <= 0) {  
    243.                             return isDone();  
    244.                         }  
    245.                     }  
    246.                 }  
    247.             }  
    248.         } finally {  
    249.             if (interrupted) {  
    250.                 Thread.currentThread().interrupt();  
    251.             }  
    252.         }  
    253.     }  
    254.   
    255.     @Override  
    256.     public IFuture<V> awaitUninterruptibly() {  
    257.         try {  
    258.             return await0(false);  
    259.         } catch (InterruptedException e) { // 这里若抛异常了就无法处理了  
    260.             throw new java.lang.InternalError();  
    261.         }  
    262.     }  
    263.       
    264.     @Override  
    265.     public boolean awaitUninterruptibly(long timeoutMillis) {  
    266.         try {  
    267.             return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);  
    268.         } catch (InterruptedException e) {  
    269.             throw new java.lang.InternalError();  
    270.         }  
    271.     }  
    272.   
    273.     @Override  
    274.     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {  
    275.         try {  
    276.             return await0(unit.toNanos(timeout), false);  
    277.         } catch (InterruptedException e) {  
    278.             throw new java.lang.InternalError();  
    279.         }  
    280.     }  
    281.   
    282.     protected IFuture<V> setFailure(Throwable cause) {  
    283.         if (setFailure0(cause)) {  
    284.             notifyListeners();  
    285.             return this;  
    286.         }  
    287.         throw new IllegalStateException("complete already: " + this);  
    288.     }  
    289.   
    290.     private boolean setFailure0(Throwable cause) {  
    291.         if (isDone()) {  
    292.             return false;  
    293.         }  
    294.   
    295.         synchronized (this) {  
    296.             if (isDone()) {  
    297.                 return false;  
    298.             }  
    299.             result = new CauseHolder(cause);  
    300.             notifyAll();  
    301.         }  
    302.   
    303.         return true;  
    304.     }  
    305.   
    306.     protected IFuture<V> setSuccess(Object result) {  
    307.         if (setSuccess0(result)) { // 设置成功后通知监听器  
    308.             notifyListeners();  
    309.             return this;  
    310.         }  
    311.         throw new IllegalStateException("complete already: " + this);  
    312.     }  
    313.   
    314.     private boolean setSuccess0(Object result) {  
    315.         if (isDone()) {  
    316.             return false;  
    317.         }  
    318.   
    319.         synchronized (this) {  
    320.             if (isDone()) {  
    321.                 return false;  
    322.             }  
    323.             if (result == null) { // 异步操作正常执行完毕的结果是null  
    324.                 this.result = SUCCESS_SIGNAL;  
    325.             } else {  
    326.                 this.result = result;  
    327.             }  
    328.             notifyAll();  
    329.         }  
    330.         return true;  
    331.     }  
    332.   
    333.     private void notifyListeners() {  
    334.         for (IFutureListener<V> l : listeners) {  
    335.             notifyListener(l);  
    336.         }  
    337.     }  
    338.   
    339.     private void notifyListener(IFutureListener<V> l) {  
    340.         try {  
    341.             l.operationCompleted(this);  
    342.         } catch (Exception e) {  
    343.             e.printStackTrace();  
    344.         }  
    345.     }  
    346.   
    347.     private static class SuccessSignal {  
    348.   
    349.     }  
    350.   
    351.     private static final class CauseHolder {  
    352.         final Throwable cause;  
    353.   
    354.         CauseHolder(Throwable cause) {  
    355.             this.cause = cause;  
    356.         }  
    357.     }  
    358. }  

     监听接口

    public interface IFutureListener<V> {
        void operationCompleted(IFuture<V> future) throws Exception;
    }

    那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:

    Java代码  收藏代码
    1. package future.test;  
    2.   
    3. import future.IFuture;  
    4. import future.IFutureListener;  
    5.   
    6. /** 
    7.  * 延时加法 
    8.  * @author lixiaohui 
    9.  * 
    10.  */  
    11. public class DelayAdder {  
    12.       
    13.     public static void main(String[] args) {  
    14.         new DelayAdder().add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer>() {  
    15.               
    16.             @Override  
    17.             public void operationCompleted(IFuture<Integer> future) throws Exception {  
    18.                 System.out.println(future.getNow());  
    19.             }  
    20.               
    21.         });  
    22.     }  
    23.     /** 
    24.      * 延迟加 
    25.      * @param delay 延时时长 milliseconds 
    26.      * @param a 加数 
    27.      * @param b 加数 
    28.      * @return 异步结果 
    29.      */  
    30.     public DelayAdditionFuture add(long delay, int a, int b) {  
    31.         DelayAdditionFuture future = new DelayAdditionFuture();   
    32.         new Thread(new DelayAdditionTask(delay, a, b, future)).start();  
    33.         return future;  
    34.     }  
    35.       
    36.     private class DelayAdditionTask implements Runnable {  
    37.   
    38.         private long delay;  
    39.           
    40.         private int a, b;  
    41.           
    42.         private DelayAdditionFuture future;  
    43.           
    44.         public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {  
    45.             super();  
    46.             this.delay = delay;  
    47.             this.a = a;  
    48.             this.b = b;  
    49.             this.future = future;  
    50.         }  
    51.   
    52.         @Override  
    53.         public void run() {  
    54.             try {  
    55.                 Thread.sleep(delay);  
    56.                 Integer i = a + b;  
    57.                 // TODO 这里设置future为完成状态(正常执行完毕)  
    58.                 future.setSuccess(i);  
    59.             } catch (InterruptedException e) {  
    60.                 // TODO 这里设置future为完成状态(异常执行完毕)  
    61.                 future.setFailure(e.getCause());  
    62.             }  
    63.         }  
    64.           
    65.     }  
    66. }  

     

    Java代码  收藏代码
    1. package future.test;  
    2.   
    3. import future.AbstractFuture;  
    4. import future.IFuture;  
    5. //只是把两个方法对外暴露  
    6. public class DelayAdditionFuture extends AbstractFuture<Integer> {  
    7.       
    8.     @Override  
    9.     public IFuture<Integer> setSuccess(Object result) {  
    10.         return super.setSuccess(result);  
    11.     }  
    12.       
    13.     @Override  
    14.     public IFuture<Integer> setFailure(Throwable cause) {  
    15.         return super.setFailure(cause);  
    16.     }  
    17.       
    18. }  

      可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。

            项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:

    Java代码  收藏代码
    1. import java.util.concurrent.Callable;  
    2. import java.util.concurrent.ExecutionException;  
    3. import java.util.concurrent.ExecutorService;  
    4. import java.util.concurrent.Executors;  
    5. import java.util.concurrent.Future;  
    6.   
    7. public class AddTask implements Callable<Integer> {  
    8.   
    9.     private int a,b;  
    10.       
    11.     public AddTask(int a, int b) {  
    12.         this.a = a;  
    13.         this.b = b;  
    14.     }  
    15.       
    16.     @Override  
    17.     public Integer call() throws Exception {  
    18.         Integer result = a + b;  
    19.         return result;  
    20.     }  
    21.       
    22.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
    23.         ExecutorService executor = Executors.newSingleThreadExecutor();  
    24.         //JDK目前为止返回的都是FutureTask的实例  
    25.         Future<Integer> future = executor.submit(new AddTask(12));  
    26.         Integer result = future.get();// 只有当future的状态是已完成时(future.isDone() = true),get()方法才会返回  
    27.     }  
    28. }  

      虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:

    Java代码  收藏代码
    1. public interface Future<V> {  
    2.     boolean cancel(boolean mayInterruptIfRunning);  
    3.     boolean isCancelled();  
    4.     boolean isDone();  
    5.     V get() throws InterruptedException, ExecutionException;  
    6.     V get(long timeout, TimeUnit unit)  
    7.         throws InterruptedException, ExecutionException, TimeoutException;  
    8. }  

     由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:

     

    Java代码  收藏代码
    1. package future;  
    2.   
    3. import java.util.concurrent.CancellationException;  
    4. import java.util.concurrent.Future;  
    5. import java.util.concurrent.TimeUnit;  
    6.   
    7. /** 
    8.  * The result of an asynchronous operation. 
    9.  *  
    10.  * @author lixiaohui 
    11.  * @param <V> 执行结果的类型参数 
    12.  */  
    13. public interface IFuture<V> extends Future<V> {   
    14.     boolean isSuccess(); // 是否成功      
    15.     V getNow(); //立即返回结果(不管Future是否处于完成状态)  
    16.     Throwable cause();  //若执行失败时的原因  
    17.         boolean isCancellable(); //是否可以取消  
    18.     IFuture<V> await() throws InterruptedException; //等待future的完成  
    19.     boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成  
    20.     boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;  
    21.         IFuture<V> awaitUninterruptibly(); //<span style="line-height: 1.5;">等待future的完成,不响应中断</span>  
    22.         boolean awaitUninterruptibly(long timeoutMillis);<span style="line-height: 1.5;">//超时</span><span style="line-height: 1.5;">等待future的完成,不响应中断</span>  
    23.     boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);  
    24.     IFuture<V> addListener(IFutureListener<V> l); //当future完成时,会通知这些加进来的监听器  
    25.     IFuture<V> removeListener(IFutureListener<V> l);  
    26.       
    27. }  

     

     

    接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:

    Java代码  收藏代码
    1. public class Object {  
    2.     /** 
    3.      * Causes the current thread to wait until another thread invokes the 
    4.      * {@link java.lang.Object#notify()} method or the 
    5.      * {@link java.lang.Object#notifyAll()} method for this object. 
    6.      * In other words, this method behaves exactly as if it simply 
    7.      * performs the call {@code wait(0)}. 
    8.      * 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify()/notifyAll() 
    9.      */  
    10.     public final void wait() throws InterruptedException {  
    11.         wait(0);  
    12.     }  
    13.   
    14.     /** 
    15.      * Wakes up all threads that are waiting on this object's monitor. A 
    16.      * thread waits on an object's monitor by calling one of the 
    17.      * {@code wait} methods. 
    18.      * <p> 
    19.      * The awakened threads will not be able to proceed until the current 
    20.      * thread relinquishes the lock on this object. The awakened threads 
    21.      * will compete in the usual manner with any other threads that might 
    22.      * be actively competing to synchronize on this object; for example, 
    23.      * the awakened threads enjoy no reliable privilege or disadvantage in 
    24.      * being the next thread to lock this object. 
    25.      */  
    26.     public final native void notifyAll();  
    27. }  

     知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):

    Java代码  收藏代码
    1. package future;  
    2.   
    3. import java.util.Collection;  
    4. import java.util.concurrent.CancellationException;  
    5. import java.util.concurrent.CopyOnWriteArrayList;  
    6. import java.util.concurrent.ExecutionException;  
    7. import java.util.concurrent.TimeUnit;  
    8. import java.util.concurrent.TimeoutException;  
    9.   
    10. /** 
    11.  * <pre> 
    12.  * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL} 
    13.  * 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例 
    14.  * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll()方法: 
    15.  * <ul> 
    16.  * <li>异步操作被取消时(cancel方法)</li> 
    17.  * <li>异步操作正常结束时(setSuccess方法)</li> 
    18.  * <li>异步操作异常结束时(setFailure方法)</li> 
    19.  * </ul> 
    20.  * </pre> 
    21.  *  
    22.  * @author lixiaohui 
    23.  * 
    24.  * @param <V> 
    25.  *            异步执行结果的类型 
    26.  */  
    27. public class AbstractFuture<V> implements IFuture<V> {  
    28.   
    29.     protected volatile Object result; // 需要保证其可见性  
    30.         /** 
    31.          * 监听器集 
    32.          */  
    33.     protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>();  
    34.   
    35.     /** 
    36.      * 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时,  
    37.      * result引用该对象 
    38.      */  
    39.     private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal();  
    40.   
    41.     @Override  
    42.     public boolean cancel(boolean mayInterruptIfRunning) {  
    43.         if (isDone()) { // 已完成了不能取消  
    44.             return false;  
    45.         }  
    46.   
    47.         synchronized (this) {  
    48.             if (isDone()) { // double check  
    49.                 return false;  
    50.             }  
    51.             result = new CauseHolder(new CancellationException());  
    52.             notifyAll(); // isDone = true, 通知等待在该对象的wait()的线程  
    53.         }  
    54.         notifyListeners(); // 通知监听器该异步操作已完成  
    55.         return true;  
    56.     }  
    57.       
    58.     @Override  
    59.     public boolean isCancellable() {  
    60.         return result == null;  
    61.     }  
    62.       
    63.     @Override  
    64.     public boolean isCancelled() {  
    65.         return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;  
    66.     }  
    67.   
    68.     @Override  
    69.     public boolean isDone() {  
    70.         return result != null;  
    71.     }  
    72.   
    73.     @Override  
    74.     public V get() throws InterruptedException, ExecutionException {  
    75.         await(); // 等待执行结果  
    76.   
    77.         Throwable cause = cause();  
    78.         if (cause == null) { // 没有发生异常,异步操作正常结束  
    79.             return getNow();  
    80.         }  
    81.         if (cause instanceof CancellationException) { // 异步操作被取消了  
    82.             throw (CancellationException) cause;  
    83.         }  
    84.         throw new ExecutionException(cause); // 其他异常  
    85.     }  
    86.   
    87.     @Override  
    88.     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {  
    89.         if (await(timeout, unit)) {// 超时等待执行结果  
    90.             Throwable cause = cause();  
    91.             if (cause == null) {// 没有发生异常,异步操作正常结束  
    92.                 return getNow();  
    93.             }  
    94.             if (cause instanceof CancellationException) {// 异步操作被取消了  
    95.                 throw (CancellationException) cause;  
    96.             }  
    97.             throw new ExecutionException(cause);// 其他异常  
    98.         }  
    99.         // 时间到了异步操作还没有结束, 抛出超时异常  
    100.         throw new TimeoutException();  
    101.     }  
    102.   
    103.     @Override  
    104.     public boolean isSuccess() {  
    105.         return result == null ? false : !(result instanceof CauseHolder);  
    106.     }  
    107.   
    108.     @SuppressWarnings("unchecked")  
    109.     @Override  
    110.     public V getNow() {  
    111.         return (V) (result == SUCCESS_SIGNAL ? null : result);  
    112.     }  
    113.   
    114.     @Override  
    115.     public Throwable cause() {  
    116.         if (result != null && result instanceof CauseHolder) {  
    117.             return ((CauseHolder) result).cause;  
    118.         }  
    119.         return null;  
    120.     }  
    121.   
    122.     @Override  
    123.     public IFuture<V> addListener(IFutureListener<V> listener) {  
    124.         if (listener == null) {  
    125.             throw new NullPointerException("listener");  
    126.         }  
    127.         if (isDone()) { // 若已完成直接通知该监听器  
    128.             notifyListener(listener);  
    129.             return this;  
    130.         }  
    131.         synchronized (this) {  
    132.             if (!isDone()) {  
    133.                 listeners.add(listener);  
    134.                 return this;  
    135.             }  
    136.         }  
    137.         notifyListener(listener);  
    138.         return this;  
    139.     }  
    140.   
    141.     @Override  
    142.     public IFuture<V> removeListener(IFutureListener<V> listener) {  
    143.         if (listener == null) {  
    144.             throw new NullPointerException("listener");  
    145.         }  
    146.   
    147.         if (!isDone()) {  
    148.             listeners.remove(listener);  
    149.         }  
    150.   
    151.         return this;  
    152.     }  
    153.   
    154.     @Override  
    155.     public IFuture<V> await() throws InterruptedException {  
    156.         return await0(true);  
    157.     }  
    158.   
    159.       
    160.     private IFuture<V> await0(boolean interruptable) throws InterruptedException {  
    161.         if (!isDone()) { // 若已完成就直接返回了  
    162.             // 若允许终端且被中断了则抛出中断异常  
    163.             if (interruptable && Thread.interrupted()) {  
    164.                 throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted.");  
    165.             }  
    166.   
    167.             boolean interrupted = false;  
    168.             synchronized (this) {  
    169.                 while (!isDone()) {  
    170.                     try {  
    171.                         wait(); // 释放锁进入waiting状态,等待其它线程调用本对象的notify()/notifyAll()方法  
    172.                     } catch (InterruptedException e) {  
    173.                         if (interruptable) {  
    174.                             throw e;  
    175.                         } else {  
    176.                             interrupted = true;  
    177.                         }  
    178.                     }  
    179.                 }  
    180.             }  
    181.             if (interrupted) {  
    182.                 // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的,   
    183.                 // 这里重新设置以便让其它代码知道这里被中断了。  
    184.                 Thread.currentThread().interrupt();  
    185.             }  
    186.         }  
    187.         return this;  
    188.     }  
    189.       
    190.     @Override  
    191.     public boolean await(long timeoutMillis) throws InterruptedException {  
    192.         return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);  
    193.     }  
    194.       
    195.     @Override  
    196.     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {  
    197.         return await0(unit.toNanos(timeout), true);  
    198.     }  
    199.   
    200.     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {  
    201.         if (isDone()) {  
    202.             return true;  
    203.         }  
    204.   
    205.         if (timeoutNanos <= 0) {  
    206.             return isDone();  
    207.         }  
    208.   
    209.         if (interruptable && Thread.interrupted()) {  
    210.             throw new InterruptedException(toString());  
    211.         }  
    212.   
    213.         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();  
    214.         long waitTime = timeoutNanos;  
    215.         boolean interrupted = false;  
    216.   
    217.         try {  
    218.             synchronized (this) {  
    219.                 if (isDone()) {  
    220.                     return true;  
    221.                 }  
    222.   
    223.                 if (waitTime <= 0) {  
    224.                     return isDone();  
    225.                 }  
    226.   
    227.                 for (;;) {  
    228.                     try {  
    229.                         wait(waitTime / 1000000, (int) (waitTime % 1000000));  
    230.                     } catch (InterruptedException e) {  
    231.                         if (interruptable) {  
    232.                             throw e;  
    233.                         } else {  
    234.                             interrupted = true;  
    235.                         }  
    236.                     }  
    237.   
    238.                     if (isDone()) {  
    239.                         return true;  
    240.                     } else {  
    241.                         waitTime = timeoutNanos - (System.nanoTime() - startTime);  
    242.                         if (waitTime <= 0) {  
    243.                             return isDone();  
    244.                         }  
    245.                     }  
    246.                 }  
    247.             }  
    248.         } finally {  
    249.             if (interrupted) {  
    250.                 Thread.currentThread().interrupt();  
    251.             }  
    252.         }  
    253.     }  
    254.   
    255.     @Override  
    256.     public IFuture<V> awaitUninterruptibly() {  
    257.         try {  
    258.             return await0(false);  
    259.         } catch (InterruptedException e) { // 这里若抛异常了就无法处理了  
    260.             throw new java.lang.InternalError();  
    261.         }  
    262.     }  
    263.       
    264.     @Override  
    265.     public boolean awaitUninterruptibly(long timeoutMillis) {  
    266.         try {  
    267.             return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);  
    268.         } catch (InterruptedException e) {  
    269.             throw new java.lang.InternalError();  
    270.         }  
    271.     }  
    272.   
    273.     @Override  
    274.     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {  
    275.         try {  
    276.             return await0(unit.toNanos(timeout), false);  
    277.         } catch (InterruptedException e) {  
    278.             throw new java.lang.InternalError();  
    279.         }  
    280.     }  
    281.   
    282.     protected IFuture<V> setFailure(Throwable cause) {  
    283.         if (setFailure0(cause)) {  
    284.             notifyListeners();  
    285.             return this;  
    286.         }  
    287.         throw new IllegalStateException("complete already: " + this);  
    288.     }  
    289.   
    290.     private boolean setFailure0(Throwable cause) {  
    291.         if (isDone()) {  
    292.             return false;  
    293.         }  
    294.   
    295.         synchronized (this) {  
    296.             if (isDone()) {  
    297.                 return false;  
    298.             }  
    299.             result = new CauseHolder(cause);  
    300.             notifyAll();  
    301.         }  
    302.   
    303.         return true;  
    304.     }  
    305.   
    306.     protected IFuture<V> setSuccess(Object result) {  
    307.         if (setSuccess0(result)) { // 设置成功后通知监听器  
    308.             notifyListeners();  
    309.             return this;  
    310.         }  
    311.         throw new IllegalStateException("complete already: " + this);  
    312.     }  
    313.   
    314.     private boolean setSuccess0(Object result) {  
    315.         if (isDone()) {  
    316.             return false;  
    317.         }  
    318.   
    319.         synchronized (this) {  
    320.             if (isDone()) {  
    321.                 return false;  
    322.             }  
    323.             if (result == null) { // 异步操作正常执行完毕的结果是null  
    324.                 this.result = SUCCESS_SIGNAL;  
    325.             } else {  
    326.                 this.result = result;  
    327.             }  
    328.             notifyAll();  
    329.         }  
    330.         return true;  
    331.     }  
    332.   
    333.     private void notifyListeners() {  
    334.         for (IFutureListener<V> l : listeners) {  
    335.             notifyListener(l);  
    336.         }  
    337.     }  
    338.   
    339.     private void notifyListener(IFutureListener<V> l) {  
    340.         try {  
    341.             l.operationCompleted(this);  
    342.         } catch (Exception e) {  
    343.             e.printStackTrace();  
    344.         }  
    345.     }  
    346.   
    347.     private static class SuccessSignal {  
    348.   
    349.     }  
    350.   
    351.     private static final class CauseHolder {  
    352.         final Throwable cause;  
    353.   
    354.         CauseHolder(Throwable cause) {  
    355.             this.cause = cause;  
    356.         }  
    357.     }  
    358. }  

     

    那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:

    Java代码  收藏代码
    1. package future.test;  
    2.   
    3. import future.IFuture;  
    4. import future.IFutureListener;  
    5.   
    6. /** 
    7.  * 延时加法 
    8.  * @author lixiaohui 
    9.  * 
    10.  */  
    11. public class DelayAdder {  
    12.       
    13.     public static void main(String[] args) {  
    14.         new DelayAdder().add(3 * 100012).addListener(new IFutureListener<Integer>() {  
    15.               
    16.             @Override  
    17.             public void operationCompleted(IFuture<Integer> future) throws Exception {  
    18.                 System.out.println(future.getNow());  
    19.             }  
    20.               
    21.         });  
    22.     }  
    23.     /** 
    24.      * 延迟加 
    25.      * @param delay 延时时长 milliseconds 
    26.      * @param a 加数 
    27.      * @param b 加数 
    28.      * @return 异步结果 
    29.      */  
    30.     public DelayAdditionFuture add(long delay, int a, int b) {  
    31.         DelayAdditionFuture future = new DelayAdditionFuture();   
    32.         new Thread(new DelayAdditionTask(delay, a, b, future)).start();  
    33.         return future;  
    34.     }  
    35.       
    36.     private class DelayAdditionTask implements Runnable {  
    37.   
    38.         private long delay;  
    39.           
    40.         private int a, b;  
    41.           
    42.         private DelayAdditionFuture future;  
    43.           
    44.         public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {  
    45.             super();  
    46.             this.delay = delay;  
    47.             this.a = a;  
    48.             this.b = b;  
    49.             this.future = future;  
    50.         }  
    51.   
    52.         @Override  
    53.         public void run() {  
    54.             try {  
    55.                 Thread.sleep(delay);  
    56.                 Integer i = a + b;  
    57.                 // TODO 这里设置future为完成状态(正常执行完毕)  
    58.                 future.setSuccess(i);  
    59.             } catch (InterruptedException e) {  
    60.                 // TODO 这里设置future为完成状态(异常执行完毕)  
    61.                 future.setFailure(e.getCause());  
    62.             }  
    63.         }  
    64.           
    65.     }  
    66. }  

     

    Java代码  收藏代码
    1. package future.test;  
    2.   
    3. import future.AbstractFuture;  
    4. import future.IFuture;  
    5. //只是把两个方法对外暴露  
    6. public class DelayAdditionFuture extends AbstractFuture<Integer> {  
    7.       
    8.     @Override  
    9.     public IFuture<Integer> setSuccess(Object result) {  
    10.         return super.setSuccess(result);  
    11.     }  
    12.       
    13.     @Override  
    14.     public IFuture<Integer> setFailure(Throwable cause) {  
    15.         return super.setFailure(cause);  
    16.     }  
    17.       
    18. }  

      可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。

  • 相关阅读:
    Decrease (Judge ver.)
    Raising Modulo Numbers
    最短Hamilton路径
    64位整数乘法
    递归系列——数组和对象的相关递归
    函数内容新增——函数表达式
    数据结构和算法(一)——栈
    (转)jQuery中append(),prepend()与after(),before()的区别
    微信端的user-Agent
    less知识点总结(二)
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/12622039.html
Copyright © 2011-2022 走看看