zoukankan      html  css  js  c++  java
  • Java 多线程(五)—— 线程池基础 之 FutureTask源码解析

    FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
    如:

    1. 取消任务执行
    2. 查询任务是否执行完成
    3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。
      注意:一旦任务执行完成或取消任务,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)

    FutureTask实现了Runnable接口和Future接口,因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。

    如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。

    示例

     1 public class FutureTaskDemo {
     2     public static void main(String[]args)throws InterruptedException {
     3         FutureTask < Integer > ft = new FutureTask <  > (new Callable < Integer > () {
     4                  @Override 
     5                  public Integer call()throws Exception {
     6                     int num = new Random().nextInt(10);
     7                     TimeUnit.SECONDS.sleep(num);
     8                     return num;
     9                 }
    10             });
    11         Thread t = new Thread(ft);
    12         t.start(); 
    13         //这里可以做一些其它的事情,跟futureTask任务并行,等需要futureTask的运行结果时,可以调用get方法获取
    14         try { 
    15             //等待任务执行完成,获取返回值
    16             Integer num = ft.get();
    17             System.out.println(num);
    18         } catch (Exception e) {
    19             e.printStackTrace();
    20         }
    21     }
    22 }

    FutureTask 源码分析

    JDK1.8自己实现了一个同步等待队列,在结果返回之前,所有的线程都被阻塞,存放到等待队列中。

    下面我们来分析下JDK1.8的FutureTask 源码

    FutureTask 类结构

     1 public class FutureTask<V> implements RunnableFuture<V> { 
     2 /** * 当前任务的运行状态。 
     3 * 
     4 * 可能存在的状态转换 
     5 * NEW -> COMPLETING -> NORMAL(有正常结果) 
     6 * NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) 
     7 * NEW -> CANCELLED(无结果) 
     8 * NEW -> INTERRUPTING -> INTERRUPTED(无结果) 
     9 */ 
    10 private volatile int state; 
    11 private static final int NEW = 0; //初始状态 
    12 private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。 
    13 private static final int NORMAL = 2; //任务正常完成,结果被set 
    14 private static final int EXCEPTIONAL = 3; //任务抛出异常 
    15 private static final int CANCELLED = 4; //任务已被取消 
    16 private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断 
    17 private static final int INTERRUPTED = 6; //线程已被中断 
    18 
    19 //将要执行的任务 
    20 private Callable<V> callable; //用于get()返回的结果,也可能是用于get()方法抛出的异常 
    21 private Object outcome; // non-volatile, protected by state reads/writes //执行callable的线程,调用FutureTask.run()方法通过CAS设置 
    22 private volatile Thread runner; //栈结构的等待队列,该节点是栈中的最顶层节点。 
    23 private volatile WaitNode waiters; 
    24 .... 

    FutureTask实现的接口信息如下:

    RunnableFuture 接口

    1 public interface RunnableFuture<V> extends Runnable, Future<V> {
    2     void run();
    3 }

    RunnableFuture 接口基础了Runnable和Future接口

    Future 接口

     1 public interface Future<V> { 
     2     //取消任务 
     3     boolean cancel(boolean mayInterruptIfRunning); 
     4     //判断任务是否已经取消 
     5     boolean isCancelled(); 
     6     //判断任务是否结束(执行完成或取消) 
     7     boolean isDone(); 
     8     //阻塞式获取任务执行结果 
     9     V get() throws InterruptedException, ExecutionException; 
    10     //支持超时获取任务执行结果 
    11     V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 
    12 }

    run 方法

     1 public void run() {
     2     //保证callable任务只被运行一次
     3     if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
     4         return;
     5     try {
     6         Callable < V > c = callable;
     7         if (c != null && state == NEW) {
     8             V result;
     9             boolean ran;
    10             try { 
    11                 //执行任务,上面的例子我们可以看出,call()里面可能是一个耗时的操作,不过这里是同步的
    12                 result = c.call();
    13                 //上面的call()是同步的,只有上面的result有了结果才会继续执行
    14                 ran = true;
    15             } catch (Throwable ex) {
    16                 result = null;
    17                 ran = false;
    18                 setException(ex);
    19             }
    20             if (ran)
    21                 //执行完了,设置result
    22                 set(result);
    23         }
    24     }
    25     finally {
    26         runner = null;
    27         int s = state;
    28         //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
    29         if (s >= INTERRUPTING)
    30             handlePossibleCancellationInterrupt(s);
    31     }
    32 }

    1.如果state状态不为New或者设置运行线程runner失败则直接返回false,说明线程已经启动过,保证任务在同一时刻只被一个线程执行。
    2.调用callable.call()方法,如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程。
    3.如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING

    set方法

    1 protected void set(V v) {
    2     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    3         outcome = v;
    4         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    5         finishCompletion();
    6     }
    7 }
    1. 首先通过CAS把state的NEW状态修改成COMPLETING状态。
    2. 修改成功则把v值赋给outcome变量。然后再把state状态修改成NORMAL,表示现在可以获取返回值。
    3. 最后调用finishCompletion()方法,唤醒等待队列中的所有节点。

    finishCompletion方法

     1 private void finishCompletion() {
     2     for (WaitNode q; (q = waiters) != null; ) { 
     3         //通过CAS把栈顶的元素置为null,相当于弹出栈顶元素
     4         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
     5             for (; ; ) {
     6                 Thread t = q.thread;
     7                 if (t != null) {
     8                     q.thread = null;
     9                     LockSupport.unpark(t);
    10                 }
    11                 WaitNode next = q.next;
    12                 if (next == null)
    13                     break;
    14                 q.next = null; // unlink to help gc
    15                 q = next;
    16             }
    17             break;
    18         }
    19     }
    20     done();
    21     callable = null; // to reduce footprint
    22 }

    把栈中的元素一个一个弹出,并通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能cancel,异常等)

    runAndReset 方法

     1 protected boolean runAndReset() {
     2     if (state != NEW ||
     3         !UNSAFE.compareAndSwapObject(this, runnerOffset,
     4                                      null, Thread.currentThread()))
     5         return false;
     6     boolean ran = false;
     7     int s = state;
     8     try {
     9         Callable<V> c = callable;
    10         if (c != null && s == NEW) {
    11             try {
    12                 // 执行任务,和run方法不同的是这里不需要设置返回值
    13                 c.call(); // don't set result
    14                 ran = true;
    15             } catch (Throwable ex) {
    16                 setException(ex);
    17             }
    18         }
    19     } finally {
    20         // runner must be non-null until state is settled to
    21         // prevent concurrent calls to run()
    22         runner = null;
    23         // state must be re-read after nulling runner to prevent
    24         // leaked interrupts
    25         s = state;
    26         if (s >= INTERRUPTING)
    27             handlePossibleCancellationInterrupt(s);
    28     }
    29     //这里并没有改变state的状态,还是NEW状态
    30     return ran && s == NEW;
    31 }
    runAndReset()和run()方法最大的区别是 runAndReset 不需要设置返回值,并且不需要改变任务的状态,也就是不改变state的状态,一直是NEW状态。

    get方法

    1 public V get()throws InterruptedException, ExecutionException {
    2     int s = state;
    3     if (s <= COMPLETING)
    4         s = awaitDone(false, 0L);
    5     return report(s);
    6 }

    如果state状态小于等于COMPLETING,说明任务还没开始执行或还未执行完成,然后调用awaitDone方法阻塞该调用线程。

    如果state的状态大于COMPLETING,则说明任务执行完成,或发生异常、中断、取消状态。直接通过report方法返回执行结果。

    awaitDone 方法

     1 private int awaitDone(boolean timed, long nanos)throws InterruptedException {
     2     final long deadline = timed ? System.nanoTime() + nanos : 0L;
     3     WaitNode q = null;
     4     boolean queued = false;
     5     for (; ; ) { 
     6         //如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常
     7         if (Thread.interrupted()) {
     8             removeWaiter(q);
     9             throw new InterruptedException();
    10         }
    11         int s = state; 
    12         //如果state状态大于COMPLETING 则说明任务执行完成,或取消
    13         if (s > COMPLETING) {
    14             if (q != null)
    15                 q.thread = null;
    16             return s;
    17         } 
    18         //如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
    19         else if (s == COMPLETING) // cannot time out yet
    20             Thread.yield(); 
    21         //构建节点
    22         else if (q == null)
    23             q = new WaitNode();
    24         //把当前节点入栈
    25         else if (!queued)
    26             queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
    27         //如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
    28         //如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
    29         else if (timed) {
    30             nanos = deadline - System.nanoTime();
    31             if (nanos <= 0L) {
    32                 removeWaiter(q);
    33                 return state;
    34             }
    35             LockSupport.parkNanos(this, nanos);
    36         }
    37         //阻塞当前线程
    38         else
    39             LockSupport.park(this);
    40     }
    41 }

    构建栈链表的节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

    report方法

    1 private V report(int s)throws ExecutionException {
    2     Object x = outcome;
    3     if (s == NORMAL)
    4         return (V)x;
    5     if (s >= CANCELLED)
    6         throw new CancellationException();
    7     throw new ExecutionException((Throwable)x);
    8 }

    如果state的状态为NORMAL,说明任务正确执行完成,直接返回计算后的值。
    如果state的状态大于等于CANCELLED,说明任务被成功取消执行、或响应中断,直接返回CancellationException异常
    否则返回ExecutionException异常。

    总结

      1.任务开始运行后,不能在次运行,保证只运行一次(runAndReset 方法除外)
      2.任务还未开始,或者任务已被运行,但未结束,这两种情况下都可以取消; 如果任务已经结束,则不可以被取消 。

     

  • 相关阅读:
    [轉]多memcached 和 mysql 主从 环境下PHP开发: 代码详解
    [轉]windows下mysql主从同步备份步骤
    PHP Mysqli Class收集
    [轉]ASP的类(class)详解
    [轉]pdo 帮助类 pdo class php 数据库 处理层 pdo dbhelper
    [轉]FCK编辑区的背景色 FCK工具条的背景 修改
    XSchEmA, Change your entire life! 发布倒计时 2天
    RBAC 权限入门
    防范式编程原则参数验证
    Noebe V2.0 美味的持久层 API 文档 一
  • 原文地址:https://www.cnblogs.com/java-chen-hao/p/10243509.html
Copyright © 2011-2022 走看看