zoukankan      html  css  js  c++  java
  • JUC锁框架源码阅读-Future和Callable

    使用方法

     public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            /**
             * 异步执行耗时统计操作
             */
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    //模拟耗时统计
                    Thread.sleep(5000);
                    return 5;
                }
            });
            //不用串行等待统计 先执行其他数据组装
            System.out.println("组装数据1");
            System.out.println("组装数据2");
            //如果统计操作没有执行完则等待
            Integer i= (Integer) future.get();
            //统一返回结果
            System.out.println("返回组装数据,耗时统计:"+i);
        }

    我们往往都是配合线程池使用。继续往下看

    <1>submit

    java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)

       public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    java.util.concurrent.AbstractExecutorService#newTaskFor

     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }

    可以发现我们最终返的FutureTask

    类图

     

    可以看到FutureTask实现了Runnable

    RunnableFuture接口定义

     /**
         * runnable和Future都需要具体实现类实现
         * @param <V>
         */
        public interface RunnableFuture<V> extends Runnable, Future<V> {
            void run();
        

    Future接口定义

    public interface Future<V> {
    
        /**
         * 取消当前的Future。会唤醒所有等待结果值的线程,抛出CancellationException异常
         * @param mayInterruptIfRunning 是否中断 计算结果值的那个线程
         * @return 返回true表示取消成功
         */
        boolean cancel(boolean mayInterruptIfRunning);
    
        // 当前的Future是否被取消,返回true表示已取消。
        boolean isCancelled();
    
        // 当前Future是否已结束。包括运行完成、抛出异常以及取消,都表示当前Future已结束
        boolean isDone();
    
        // 获取Future的结果值。如果当前Future还没有结束,那么当前线程就等待,
        // 直到Future运行结束,那么会唤醒等待结果值的线程的。
        V get() throws InterruptedException, ExecutionException;
    
        // 获取Future的结果值。与get()相比较多了允许设置超时时间。
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    Callable接口定义

    public interface Callable<V> {
        /**
         * 与runable不同的是增加了返回值     *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }

    FutureTask状态定义

        // 表示FutureTask当前的状态
        private volatile int state;
        // NEW 新建状态,表示这个FutureTask还没有开始运行
        private static final int NEW          = 0;
        // COMPLETING 完成状态, 表示FutureTask任务已经计算完毕了,
        // 但是还有一些后续操作,例如唤醒等待线程操作,还没有完成。
        private static final int COMPLETING   = 1;
        // FutureTask任务完结,正常完成,没有发生异常
        private static final int NORMAL       = 2;
        // FutureTask任务完结,因为发生异常。
        private static final int EXCEPTIONAL  = 3;
        // FutureTask任务完结,因为取消任务
        private static final int CANCELLED    = 4;
        // FutureTask任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求。
        private static final int INTERRUPTING = 5;
        // FutureTask任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求。
        private static final int INTERRUPTED  = 6;

    FutureTask

    run

      public void run() {
            /**
             * 1.如果状态state不是NEW
             * 2.cas设置runner失败,表示已经由其他线程抢先设置了
             * 保证了只有一个线程可以运行try 代码块中的代码。
             */
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                return;
            try {
                //获得Callable具体任务是吸纳
                Callable<V> c = callable;
                //如果Callable任务不为空 同时任务还没有开始过执行callable任务
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //执行call任务 发挥具体值
                        result = c.call();
                        //执行完毕
                        ran = true;
                    } catch (Throwable ex) {
                        //出现异常
                        result = null;
                        ran = false;
                        //<1>设置异常 设置状态state为EXCEPTIONAL 同时唤醒其他等待结果的线程
                        setException(ex);
                    }
                    //<2>执行成保存结果 设置sate为COMPLETING 同时唤醒其他等待结果的线程
                    if (ran)
                        set(result);
                }
            } finally {
                runner = null;
                int s = state;
                // 当状态大于或等于INTERRUPTING,调用handlePossibleCancellationInterrupt方法,
                // 等待别的线程将状态设置成INTERRUPTED
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    <1>setException

     protected void setException(Throwable t) {
            //CAS将状态改为COMPLETING
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                //CAS将状态改为EXCEPTIONAL
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }

    <2>set

       protected void set(V v) {
            //cas修改状态为COMPLETING
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                //保存结果
                outcome = v;
                //CAS修改状态为NORMAL
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                //唤醒等待线程
                finishCompletion();
            }
        }

    <3>finishCompletion

     private void finishCompletion() {
            // 循环唤醒等待线程
            for (FutureTask.WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        FutureTask.WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
           //空实现 子类可以实现
            done();
    
            callable = null;        // to reduce footprint
        }

    get

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            //表示未开始执行 或者 执行完成还没组装完结果
            if (s <= COMPLETING)
                /<1>自旋根据状态获取结果,判断是否阻塞,默认没有超时时间
                s = awaitDone(false, 0L);
            //<4>根据状态判断是否发生异常 是抛出异常还是 正常返回结果
            return report(s);
        }

    <1>awaitDone

     private int awaitDone(boolean timed, long nanos)
                throws InterruptedException {
            //如果有超时时间计算截止时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            FutureTask.WaitNode q = null;
            // 节点是否已添加
            boolean queued = false;
            for (;;) {
                // 如果当前线程中断标志位是true,
                // <3>那么从列表中移除节点q,并抛出InterruptedException异常
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                //获得状态
                int s = state;
                // 当状态大于COMPLETING时,表示FutureTask任务已结束。
                if (s > COMPLETING) {
                    if (q != null)
                        // 将节点q线程设置为null,因为线程没有阻塞等待
                        q.thread = null;
                    return s;
                }
                // 表示还有一些后序操作没有完成,那么当前线程让出执行权
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    // 使用当前线程创建节点p
                    q = new FutureTask.WaitNode();
                else if (!queued)
                    // 使用CAS函数将新节点添加到链表中,如果添加失败,那么queued为false,
                    // 下次循环时,会继续添加,直到成功。
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q.next = waiters, q);
                else if (timed) {
                    // timed为true表示需要设置超时
                    nanos = deadline - System.nanoTime();
                    //超时时间已经过了 则从队列移除
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    //现在阻塞 指定时间 等待唤醒
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    //线程阻塞 等待唤醒
                    LockSupport.park(this);
            }
        }

    <3>removeWaiter

        // 从链表中删除节点node
        private void removeWaiter(WaitNode node) {
            if (node != null) {
                // 将thread设置null
                node.thread = null;
                retry:
                for (;;) {          // restart on removeWaiter race
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                        // 记录当前节点q的下一个节点s
                        s = q.next;
                        // 如果当前节点q的thread不等于null,那么就用pred记录q
                        if (q.thread != null)
                            pred = q;
                        // 如果当前节点q的thread等于null,且pred不等于null
                        else if (pred != null) {
                            // 删除当前节点q
                            pred.next = s;
                            // 如果pred.thread == null,那么继续retry的for循环
                            if (pred.thread == null) // check for race
                                continue retry;
                        }
                        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                              q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }

     <4>report

    private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
  • 相关阅读:
    在线心理测试脚本
    素数
    设置层叠效果
    年轻,无权享受...
    Unity3D之预设
    Json解析类
    php 正则表达式
    php 字符串处理
    php 基础语法
    SQL 函数
  • 原文地址:https://www.cnblogs.com/LQBlog/p/15233692.html
Copyright © 2011-2022 走看看