zoukankan      html  css  js  c++  java
  • Java并发包9--Future接口简介

    前言

    在单线程模型下,想要获取方法执行的结果比较简单,直接获取方法返回的结果即可。但是在多线程情况下,如何获取其他线程执行的结果,此时就涉及到多线程之间数据传输的问题。比如A线程内部开启B线程和C线程,A线程如何获取B、C线程的执行结果呢?另外在B、C线程执行过程中,A线程是一直处于阻塞状态等待还是非阻塞继续执行呢?如果能够获取到B、C线程的结果那么何时去获取呢?这都是获取多线程执行结果需要解决的问题,而在JUC中就提供了Future就可以实现。

    1、Callable接口

    想要通过异步获取结果,就离不开Callable接口,众所周知创建线程可以通过继承Thread、实现Runnable的方式,另外还有一种可以返回操作结果的方式就是实现Callable接口

    Callable接口只定义了一个方法,就是返回操作结果,定义如下:

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * 返回操作结果或者直接抛异常
         */
        V call() throws Exception;
    }

    Callable接口可以返回操作的结果,那么还需要有一个对象来接收操作的结果,该对象就是Future对象

    2、Future接口

    Future 用于获取异步操作的结果,异步操作之后,结果会存在Future中,可以通过get()方法获取,Future接口定义如下:

    public interface Future<V> {
            /**
             * 取消异步操作
             */
            boolean cancel(boolean mayInterruptIfRunning);
    
            /**
             * 判断异步操作是否取消
             */
            boolean isCancelled();
    
            /**
             * 判断异步操作是否完成
             */
            boolean isDone();
    
            /**
             * 获取异步结果
             */
            V get() throws InterruptedException, ExecutionException;
    
            /**
             * 获取异步结果,并且设置了超时时间
             */
            V get(long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException;
        }

    3、FutureTask类

    FutureTask是Future的实现类,并且实现了Runnable接口,构造函数是Callable类型,所以FutureTask的作用就是创建线程执行FutureTask的内容并获取结果

    3.1、使用案例

    使用案例如下:

     1 public static void main(String[] args) throws ExecutionException, InterruptedException {
     2         /** 创建FutureTask,构造函数传入Callable*/
     3         FutureTask futureTask = new FutureTask(new Callable() {
     4             @Override
     5             public Object call() throws Exception {
     6                 Thread.sleep(3000L);
     7                 System.out.println("返回执行结果");
     8                 return "test";
     9             }
    10         });
    11         System.out.println("等待执行结果");
    12         /** 创建线程运行Callable内容*/
    13         futureTask.run();
    14         /** 阻塞等待FutureTask的执行结果*/
    15         String result = (String) futureTask.get();
    16         System.out.println("结果为:"+result);
    17     }

    用法比较简单,首先通过构造函数传入Callable来构建FutureTask对象,然后调用FutureTask的run方法创建线程执行Callable的call方法,最后主线程调用FutureTask的get方法阻塞等待获取call方法执行的结果

    3.2、实现原理

    FutureTask实现了Runnable接口和Future接口,所以需要实现Runnable的run方法以及Future的5个方法,这里重点分析下run方法和get方法即可。

    首先从FutureTask的构造方法分析,构造方法如下:

        /** 任务状态*/
        private volatile int state;
        /** 任务*/
        private Callable<V> callable;
        /** 执行任务线程 */
        private volatile Thread runner;
    
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            /** 状态为新建状态*/
            this.state = NEW;       
        }

    FutureTask内部有三个核心属性,分别表示任务的状态、任务的具体内容和执行任务的线程,构造方法创建FutureTask时会初始化callable和state两个属性,而执行的线程runner会留在任务执行时再创建.

    FutureTask的run方法逻辑如下:

     1 public void run() {
     2         /** 1.通过Unsafe的CAS将当前线程Thread对象赋值给当前的FutureTask对象的runner属性 */
     3         if (state != NEW ||
     4                 !UNSAFE.compareAndSwapObject(this, runnerOffset,
     5                         null, Thread.currentThread()))
     6             return;
     7         try {
     8             Callable<V> c = callable;
     9             if (c != null && state == NEW) {
    10                 V result;
    11                 /** 完成标志 */
    12                 boolean ran;
    13                 try {
    14                     /** 2.执行Callable的call方法并获取结果 */
    15                     result = c.call();
    16                     ran = true;
    17                 } catch (Throwable ex) {
    18                     result = null;
    19                     ran = false;
    20                     setException(ex);
    21                 }
    22                 if (ran)
    23                     /** 3.设置操作结果 */
    24                     set(result);
    25             }
    26         } finally {
    27             /** 状态重制*/
    28             runner = null;
    29             int s = state;
    30             if (s >= INTERRUPTING)
    31                 handlePossibleCancellationInterrupt(s);
    32         }
    33     }

    逻辑比较清晰,首先是给FutureTask对象的runner赋值为当前线程Thread对象;然后直接执行Callable的call()方法执行;最好将call()执行结果通过set方法进行设置,具体如何设置就需要继续查看set方法逻辑如下,

     1  /** 设置操作结果*/
     2     protected void set(V v) {
     3         /** 1.设置FutureTask对象的state属性 */
     4         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
     5             /** 2.赋值操作结果*/
     6             outcome = v;
     7             /** 3.通过CAS设置state为完成状态NORMAL */
     8             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
     9             /** 4.完成操作*/
    10             finishCompletion();
    11         }
    12     }
    13 
    14     /** 完成操作*/
    15     private void finishCompletion() {
    16         /** 1.获取当前等待节点waiters赋值给q */
    17         for (WaitNode q; (q = waiters) != null;) {
    18             /** 2.死循环中判断当前waiters设置为null */
    19             if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
    20                 for (;;) {
    21                     Thread t = q.thread;
    22                     if (t != null) {
    23                         q.thread = null;
    /** 唤醒等待结果的线程*/
    24 LockSupport.unpark(t); 25 } 26 WaitNode next = q.next; 27 if (next == null) 28 break; 29 q.next = null; // unlink to help gc 30 q = next; 31 } 32 break; 33 } 34 } 35 /** 2.任务完成,空实现,子类可用于扩展*/ 36 done(); 37 /** 3.重置callable属性 */ 38 callable = null; // to reduce footprint 39 }

    核心就是通过CAS更新FutureTask的状态,然后将操作结果赋值的FutureTask的outcome属性用于存储操作结果,而最后如果想要获取操作结果,就可以通过FutureTask的get方法获取outcome的值即可,逻辑如下:

     1 public V get() throws InterruptedException, ExecutionException {
     2         int s = state;
     3         if (s <= COMPLETING)
     4             /** 如果当前状态不是已完成状态,那么就执行await进入等待状态,等待被唤醒*/
     5             s = awaitDone(false, 0L);
     6         return report(s);
     7     }
     8 
     9     private V report(int s) throws ExecutionException {
    10         Object x = outcome;
    11         if (s == NORMAL)
    12             /** 返回操作结果*/
    13             return (V)x;
    14         if (s >= CANCELLED)
    15             throw new CancellationException();
    16         throw new ExecutionException((Throwable)x);
    17     }

    在获取结果时判断当前状态是否是已经完成,如果不是就需要调用awaitDone方法等待操作结果,实际是调用LockSupport的park方法进入等待状态,逻辑如下:

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }

    总结:

    1、FutureTask内部有Callable对象,FutureTask实现了Future和Runnable接口,执行run方法时会用调用run方法的线程执行Callable的call方法,并且将结果存入FutureTask的内部属性outcome中;

    2、当不同线程调用FutureTask对象的get方法时,如果当前FutureTask状态已完成就直接返回结果;如果还未完成就通过LockSupport的park方法进行阻塞等待,多个线程等待就会封装成WaitNode节点对象组成链表结构;

    3、当FutureTask任务执行完成之后,如果等待链表存在就会遍历链表中WaitNode节点,并依次通过LockSupport的unpark方法唤醒等待的线程,等待的线程被唤醒后就可以直接获取FutureTask的结果

    4、FutureTask通常会和线程池配合使用,将FutureTask直接提交给线程池处理

  • 相关阅读:
    钱多多软件制作04
    团队项目01应用场景
    HDU 4411 arrest
    HDU 4406 GPA
    HDU 3315 My Brute
    HDU 3667 Transportation
    HDU 2676 Matrix
    欧拉回路三水题 POJ 1041 POJ 2230 POJ 1386
    SPOJ 371 BOXES
    POJ 3422 Kaka's Matrix Travels
  • 原文地址:https://www.cnblogs.com/jackion5/p/15018991.html
Copyright © 2011-2022 走看看