zoukankan      html  css  js  c++  java
  • Java并发编程的艺术笔记(九)——FutureTask详解

    FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,多用于耗时的计算。

    一.FutureTask的三种状态

     

     二.get()和cancel()执行示意

    三.使用

    一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

    3.1 FutureTask + Thread

    package concurrent;  
    
    import java.util.Random;    
    import java.util.concurrent.Callable;    
    import java.util.concurrent.ExecutionException;    
    import java.util.concurrent.FutureTask;    
        
    /**  
     *  
     * @author Administrator  
     *  
     */    
    @SuppressWarnings("all")    
    public class FutureTaskDemo {    
        public static void main(String[] args) {    
            //step2:创建计算任务,作为参数,传入FutureTask   
            CalculateTask cTask = new CalculateTask();    
            FutureTask futureTask = new FutureTask(cTask);    
            //step3:将FutureTask提交给Thread执行    
            Thread pAccountThread = new Thread(futureTask);  
            System.out.println("futureTask线程现在开始启动,启动时间为:" + System.nanoTime());    
            pAccountThread.start();    
            System.out.println("主线程开始执行其他任务");    
            // 从其他账户获取总金额    
            int totalMoney = new Random().nextInt(100000);    
            System.out.println("现在你在其他账户中的总金额为" + totalMoney);    
            System.out.println("等待私有账户总金额统计完毕...");    
            // step4:测试后台的计算线程是否完成,如果未完成则等待    
            while (!futureTask.isDone()) {    
                try {    
                    Thread.sleep(500);    
                    System.out.println("私有账户计算未完成继续等待...");    
                } catch (InterruptedException e) {    
                    e.printStackTrace();    
                }    
            }    
            System.out.println("futureTask线程计算完毕,此时时间为" + System.nanoTime());    
            Integer privateAccountMoney = null;   
            //step5:获取执行结果
            try {    
                privateAccountMoney = (Integer) futureTask.get();    
            } catch (InterruptedException e) {    
                e.printStackTrace();    
            } catch (ExecutionException e) {    
                e.printStackTrace();    
            }    
            System.out.println("您现在的总金额为:" + totalMoney + privateAccountMoney.intValue());    
        }    
    }    
        
    /**
     * @author admin
     *step1:封装一个计算任务,计算私有账户余额,实现Callable接口
     */
    @SuppressWarnings("all")    
    class CalculateTask implements Callable {    
        Integer totalMoney;    
        
        @Override    
        public Object call() throws Exception {
            //模拟耗时操作
            Thread.sleep(5000);    
            totalMoney = new Integer(new Random().nextInt(10000));    
            System.out.println("您当前有" + totalMoney + "在您的私有账户中");    
            return totalMoney;    
        }    
    }

    执行结果:

    futureTask线程现在开始启动,启动时间为:57541141963086
    主线程开始执行其他任务
    现在你在其他账户中的总金额为30431
    等待私有账户总金额统计完毕...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    私有账户计算未完成继续等待...
    您当前有4831在您的私有账户中
    私有账户计算未完成继续等待...
    futureTask线程计算完毕,此时时间为57546161371100
    您现在的总金额为:304314831
    

     3.2 Future + ExecutorService

    //step1 ......
    //step2:创建计算任务
    Task task = new Task();
    //step3:创建线程池,将Callable类型的task提交给线程池执行,通过Future获取子任务的执行结果
    ExecutorService executorService = Executors.newCachedThreadPool();
    final Future<Boolean> future = executorService.submit(task);
    //step4:通过future获取执行结果
    boolean result = (boolean) future.get();

    四.源码解析

    4.1 状态

    private volatile int state;
     private static final int NEW          = 0; 
     private static final int COMPLETING   = 1;
     private static final int NORMAL       = 2;
     private static final int EXCEPTIONAL  = 3;
     private static final int CANCELLED    = 4;
     private static final int INTERRUPTING = 5;
     private static final int INTERRUPTED  = 6;

    4.2 构造函数

    /**
         * Creates a {@code FutureTask} that will, upon running, execute the
         * given {@code Callable}.
         *
         * @param  callable the callable task
         * @throws NullPointerException if the callable is null
         */
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }

    可以看到FutrueTask传入的是一个Callable类型的变量,将传入的参数赋值给this.callable,然后设置state状态为NEW,表示这是新任务。

    也可以使用Runnable+result组合成一个Callable

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

     4.3执行任务——run()方法

    public void run() {
        //1.判断状态是否是NEW,不是NEW,说明任务已经被其他线程执行,甚至执行结束,或者被取消了,直接返回
        //2.调用CAS方法,判断runnerOffset为null的话,就将当前线程保存到runnerOffset中
        //设置runnerOffset失败,就直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            //执行callable任务,返回result
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //遇到异常,设置异常
                    setException(ex);
                }
                //执行完毕,设置result
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            //设置runner为null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            //如果被中断了,处理中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    看一下runnerOffset是啥:

    private static final sun.misc.Unsafe UNSAFE;private static final long runnerOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
    runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); } catch (Exception e) { throw new Error(e); } }

    可以看到它指向runner字段的偏移地址,相当于指针

    发生异常:

    protected void setException(Throwable t) {
         // stateOffset NEW --> COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    //返回异常信息 outcome
    = t;
    //stateOffset --> EXCEPTIONAL UNSAFE.putOrderedInt(
    this, stateOffset, EXCEPTIONAL); // final state
    //结束 finishCompletion(); } }

    正常执行:

    protected void set(V v) {
        //state New --> COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //返回结果
            outcome = v;
            //state --> NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //结束
            finishCompletion();
        }
    }

    被中断:

    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                //让出线程
                Thread.yield();
    }

    4.4 获取结果——get()方法

    public V get() throws InterruptedException, ExecutionException {
           //获取状态
            int s = state;
            //如果没有完成则等待
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }

    进去看一下awaitDone()方法

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        
        for (;;) {
            //如果被中断,则移除wait节点,并返回异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            //1 得到状态
            int s = state;
             //1.1 如果s > COMPLETING,直接返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //1.2 如果s == COMPLETING但是还没写到outcome,表示任务结束,让出线程
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //q == null,则  new 一个新的节点
            else if (q == null)
                q = new WaitNode();
            //如果还没有加入等待队列,则加入队列头
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //如果设置了超时时间
            else if (timed) {
                nanos = deadline - System.nanoTime();
                //时间到了移除节点,停止等待
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //时间没到,阻塞,等待
                LockSupport.parkNanos(this, nanos);
            }
            //没设置则阻塞,等待
            else
                LockSupport.park(this);
        }
    }

    4.5 cancel()方法

    public boolean cancel(boolean mayInterruptIfRunning) {
        //state != NEW,返回false
        //state == NEW,判断是否要中断任务的执行,是则stateOffset=INTERRUPTING,否则stateOffset=CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    //读取当前正在执行子任务的线程runner,调用t.interrupt(),中断线程执行
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    //stateOffset = INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

    4.6 最终方法finishCompletion(),释放资源

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
    
        done();
    
        callable = null;        // to reduce footprint
    }
  • 相关阅读:
    关于这个 blog
    P6499 [COCI2016-2017#2] Burza 题解
    CF1172F Nauuo and Bug 题解
    CF1479D Odd Mineral Resource 题解
    CF1442E Black, White and Grey Tree 题解
    CF1442D Sum 题解
    CF1025D Recovering BST 题解
    CF1056E Check Transcription 题解
    CF1025F Disjoint Triangles 题解
    红包算法的PHP实现
  • 原文地址:https://www.cnblogs.com/lingluo2017/p/10254371.html
Copyright © 2011-2022 走看看