zoukankan      html  css  js  c++  java
  • Java8 FutureTask 分析

    实现FutureTask的要点

    1.需要实现一个链表(每个节点包含当前线程的引用)

    2.通过LockSupport.park 对线程进行阻塞

    3.节点的唤醒(task完成, 线程Interrupt, 或await超时),

     
    FutureTask.run 方法
    public void run() {
            // 判断 state 是否是new, 防止并发重复执行
            if(state != NEW ||
                    !unsafe.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())){
                return;
            }
    
            try {
                Callable<V> c = callable;
                if(c != null && state == NEW){
                    V result ;
                    boolean ran;
                    try{ // 调用call方法执行计算
                        result = c.call();
                        ran = true;
                    }catch (Throwable ex){
                        result = null;
                        ran = false;
                        // 执行中抛异常, 更新state状态, 释放等待的线程(调用finishCompletion)
                        setException(ex);
                    }
                    if(ran){ // 执行成功, 进行赋值操作
                        set(result);
                    }
                }
            }finally {
                 // runner must be non-null until state is settled to prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent leaked interrupts
                int s = state;
                if(s >= INTERRUPTING){
                    handlePossibleCancellationInterrupt(s);
                }
            }
        }

    这里看到state这个变量, 它是futureTask执行任务的状态(一个有7种)
      /**
         * 这几种状态比较重要, 是 FutureTask 中 state 的状态转变的几种情况
         * Possible state's transitions
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INETRRUPTED
         */
    
        private volatile int state;
        private static final int NEW             = 0;
        private static final int COMPLETING     = 1;
        private static final int NORMAL          = 2;
        private static final int EXCEPTIONAL    = 3;
        private static final int CANCELLED      = 4;
        private static final int INTERRUPTING   = 5;
        private static final int INTERRUPTED      = 6;
    

    而run其实没做什么, 就是执行 callable.call方法, 成功的话将执行结果调用set进行赋值, 并更新state的值(通过cas)

     future.get(timeout,TimeUnit) 方法
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            // get(timeout, unit) 也很简单, 主要还是在 awaitDone里面
            if(unit == null){
                throw new NullPointerException();
            }
            int s = state;
            // 判断state状态是否 <= Completing, 调用awaitDone进行旋转
            if(s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING){
                throw new TimeoutException();
            }
            // 根据state的值进行返回结果或抛出异常
            return report(s);
        }

    get() 方法中涉及到 awaitDone 方法, 将awaitDone的运行结果赋值给state, 最后report方法根据state值进行返回相应的值, 而awaitDone是整个 FutureTask 运行的核心
    那下面来看 awaitDone的方法
    /**
         * Awaits completion or aborts on interrupt or timeout
         * 调用 awaitDone 进行线程的自旋
         * 自旋一般调用步骤
         *  1) 若支持线程中断, 判断当前的线程是否中断
         *      a. 中断, 退出自旋, 在线程队列中移除对应的节点
         *      b. 进行下面的步骤
         *  2) 将当前的线程构造成一个 WaiterNode 节点, 加入到当前对象的队列里面 (进行 cas 操作)
         *  3) 判断当前的调用是否设置阻塞超时时间
         *      a. 有 超时时间, 调用 LockSupport.parkNanos; 阻塞结束后, 再次进行 自旋 , 还是到同一个if, 但 nanos = 0L, 删除链表中对应的 WaiterdNode, 返回 state值
         *      b. 没 超时时间, 调用 LockSupport.park
         *
         * @param timed true if use timed waits
         * @param nanos time to waits, if timed
         * @return state upon completion
         */
        private int awaitDone(boolean timed, long nanos) throws InterruptedException{
            // default timed = false, nanos = 0, so deadline = 0
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for(;;){
                // Thread.interrupted 判断当前的线程是否中断(调用两次会清楚对应的状态位)
                // Thread.interrupt 将当前的线程设置成中断状态
                if(Thread.interrupted()){
                    removeWaiter(q, Thread.currentThread().getId());
                    throw new InterruptedException();
                }
    
                int s = state;
                /** 1. s = NORMAL, 说明程序执行成功, 直接获取对应的 V
                 */
                if(s > COMPLETING){
                    if(q != null){
                        q.thread = null;
                    }
                    return s;
                }
                // s = COMPLETING ; 看了全部的代码说明整个任务在处理的中间状态, s紧接着会进行改变
                // s 变成 NORMAL 或 EXCEPTION
                // 所以调用 yield 让线程状态变更, 重新进行CPU时间片竞争, 并且进行下次循环
                else if(s == COMPLETING){ // cannot time out yet
                    Thread.yield();
                }
                // 当程序调用 get 方法时, 一定会调用一次下面的方法, 对 q 进行赋值
                else if(q == null){
                    q = new WaitNode();
                }
                // 判断有没将当前的线程构造成一个节点, 赋值到对象对应的属性里面
                // 第一次 waiters 一定是 null 的, 进行赋值的是一个以 q 为首节点的栈(JUC里面还有一处用栈的就在 SynchronousQueue中)
                else if(!queued){
                    queued = unsafe.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
                }
                // 调用默认的 get()时, timed = false, 所以不执行这一步
                else if(timed){
                    // 进行阻塞时间的判断, 第二次循环时, nanos = 0L, 直接 removeWaiter 返回现在 FutureTask 的 state
                    nanos = deadline - System.nanoTime();
                    if(nanos <= 0L){
                        removeWaiter(q, Thread.currentThread().getId());
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                // 进行线程的阻塞
                else{
                    LockSupport.park(this);
                }
            }
        }

    结合我们刚才例子(FutureMain)中的两个调用futureTask.get()方法

    第一个futureTask.get(2. TimeUnit.SECOND), 因为执行的任务需要花费3秒, 所以它先会LockSupport.parkNanos(210001000*1000) 阻塞2秒, 之后再次进行同样的地方, 但nanos已是0, 所以调用removeWaiter方法, 最后抛出异常

    第二个futureTask.get(4. TimeUnit.SECOND), 因为执行的任务需要花费3秒, 所以它先会LockSupport.parkNanos(410001000*1000) 阻塞4秒, 但是任务只花费3秒, 所以执行完成后会调用set方法进行赋值, 在set方法中有个finishCompletion方法, 这个方法会唤醒所有阻塞的节点, 所以第二个futureTask.get只花费3秒就得到了结果

    分析一下 removeWaiter 方法(这是实现并发链表中移除队列节点的一个操作)
    /**
         * Tries to unlinked a time-out
         * @param node
         */
        private void  removeWaiter(WaitNode node, long i){
            logger.info("removeWaiter node"  + node +", i: "+ i +" begin");
            if(node != null){
                node.thread = null; // 将移除的节点的thread=null, 为移除做标示
    
                retry:
                for(;;){ // restart on removeWaiter race
                    for(WaitNode pred = null, q = waiters, s; q != null; q = s){
                        logger.info("q : " + q +", i:"+i);
                        s = q.next;
                        // 通过 thread 判断当前 q 是否是需要移除的 q节点
                        if(q.thread != null){
                            pred = q;
                            logger.info("q : " + q +", i:"+i);
                        }
                        // 何时执行到这个if条件 ?
                        // hehe 只有第一步不满足时, 也就是q.thread=null (p就是应该移除的节点)
                        else if(pred != null){
                            logger.info("q : " + q +", i:"+i);
                            pred.next = s; // 将前一个节点的 next 指向当前节点的 next 节点
                            // pred.thread == null 这种情况是在多线程进行并发 removeWaiter 时产生的
                            // 而此时真好移除节点 node 和 pred, 所以loop跳到retry, 在进行一次
                            if(pred.thread == null){ // check for race
                                continue retry;
                            }
                        }
                        // 这一步何时操作呢?
                        // 想想 若p是头节点
                        else if(!unsafe.compareAndSwapObject(this, waitersOffset, q, s)){
                            logger.info("q : " + q +", i:"+i);
                            continue retry; // 这一步还是 cheak for race
                        }
                    }
                    break ;
                }
                logger.info("removeWaiter node"  + node +", i: "+ i +" end");
            }
        }

    removeWaiter 这个方法我认为是最复杂的, 你需要考虑多种情况
    1. 移除的节点是队列的头节点
    2. 移除的节点是队列中的中间节点
    3. 在并发情况下, 两个线程同时removeWaiter操作)

    debug代码:
    public class TestFutureTask {
    
    
        public static void main(String[] args) throws Exception {
    
            /**
             * 第一种方式:Future + ExecutorService
             * Task task = new Task();
             * ExecutorService service = Executors.newCachedThreadPool();
             * Future<Integer> future = service.submit(task1);
             * service.shutdown();
             */
    
    
            /**
             * 第二种方式: FutureTask + ExecutorService
             * ExecutorService executor = Executors.newCachedThreadPool();
             * Task task = new Task();
             * FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
             * executor.submit(futureTask);
             * executor.shutdown();
             */
    
            /**
             * 第三种方式:FutureTask + Thread
             */
    
            // 2. 新建FutureTask,需要一个实现了Callable接口的类的实例作为构造函数参数
            FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
            // 3. 新建Thread对象并启动
            Thread thread = new Thread(futureTask);
            thread.setName("Task thread");
            thread.start();
    
            // 4. 调用isDone()判断任务是否结束
    //        if(!futureTask.isDone()) {
    //            System.out.println("Task is not done");
    //            Thread.sleep(2000);
    //        }
    
            // 5. 调用get()方法获取任务结果,如果任务没有执行完成则阻塞等待
            new Thread(()->{
                try {
                    System.out.println("thread one result is " + futureTask.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }, "thread one").start();
    
            new Thread(()->{
                try {
                    System.out.println("thread two result is " + futureTask.get(1, TimeUnit.SECONDS));
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    e.printStackTrace();
                }
            }, "thread two").start();
    
    
        }
    
        // 1. 继承Callable接口,实现call()方法,泛型参数为要返回的类型
        static class Task  implements Callable<Integer> {
    
            @Override
            public Integer call() throws Exception {
                System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
                int result = 0;
                for(int i = 0; i < 100;++i) {
                    result += i;
                }
    
                Thread.sleep(20*1000);
                return result;
            }
        }
    }


    作者:爱吃鱼的KK
    链接:https://www.jianshu.com/p/b765c0d0165d
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    149. Max Points on a Line(js)
    148. Sort List(js)
    147. Insertion Sort List(js)
    146. LRU Cache(js)
    145. Binary Tree Postorder Traversal(js)
    144. Binary Tree Preorder Traversal(js)
    143. Reorder List(js)
    142. Linked List Cycle II(js)
    141. Linked List Cycle(js)
    140. Word Break II(js)
  • 原文地址:https://www.cnblogs.com/yuyutianxia/p/3985254.html
Copyright © 2011-2022 走看看