zoukankan      html  css  js  c++  java
  • FutrueTask原理及源码分析

    1.前言

    相信很多人了解到FutureTask是因为ThreadPoolExecutor.submit方法,根据ThreadPoolExecutor.submit的使用,我们可以先猜一下FutureTask的原理。

    public static void main(String[] args) throws ExecutionException, InterruptedException {

    FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {

    @Override
    public Integer call() throws Exception {
    TimeUnit.SECONDS.sleep(3);
    return 1;
    }
    });
    new Thread(futureTask).start();

    System.out.println(futureTask.get());
    }

    上面这个代码会在启动后三秒打印出1,FutureTask.get()方法调用时会直到Callable中的代码执行完才会返回,所以FutureTask需要在这里阻塞。因为可能多个线程进行get,所以需要一个阻塞队列。

    如果Callable三秒执行完,调用方过了五秒才调用get的话,FutureTask就需要把Callable中的执行结果存起来,并且也要把异常catch住存起来,所以需要一个变量存放结果。使用一个api然后想去研究它的原理,源码时,其实可以

    先想一下,它可能是怎么做的,如果是我写应该怎样设计,这样能提高自己的设计能力。

    2.原理

    FutureTask的原理其实和前言中的猜想类似,下面简述一下FutureTask的原理。

    FutureTask有两个非常重要的方法,run方法和get方法,run方法是实现了Runnable然后在run里面跑Callable的代码,

    get方法就是我们常用的获取数据的方法。run方法运行Callable中的代码然后catch住异常,然后将正常结果或者异常结果

    存起来,并且唤醒因为调用get方法阻塞的线程。get方法是去判断是否已经计算出结果,如果计算完成,返回结果否则进行

    阻塞。

    3.源码分析

    建议大家在阅读源码时,先看一下文档,虽然文档是英文的,但是自己读一下搭配翻译看懂应该不难,这里给大家介绍一个IDEA的功能,点击View->QuickDocumentation能让文档读起来更加方便。

    下面我就分析一下源码:

      
       FutureTask中的状态维护
        private volatile int state;
        private static final int NEW          = 0; //初始状态
        private static final int COMPLETING   = 1; //执行完成但是执行结果没有保存
        private static final int NORMAL       = 2; //执行完成并且保存了结果
        private static final int EXCEPTIONAL  = 3; //出现了异常
        private static final int CANCELLED    = 4; //取消
        private static final int INTERRUPTING = 5; //打断中,可以进行打断线程了
        private static final int INTERRUPTED  = 6; //线程已经被置成打断状态
    
        private Callable<V> callable; //入参
       
        private Object outcome; //执行成功结果保存到这个变量
       
        private volatile Thread runner; //正在执行的线程
       
        private volatile WaitNode waiters;//等待队列
     如果你尝试用idea追踪者这些变量在哪里赋值了,你会发现你找不到,这是因为这些变量的赋值都是通过Unsafe类完成的,这个类会直接改这些变量内存地址上对应的值。
      Unsafe可以通过对象+字段的offset找到字段对应的内存地址从而修改数据,了解了这些,在去看FutureTask的代码就很容易了
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> k = FutureTask.class;
    stateOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("state"));
    runnerOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("runner"));
    waitersOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("waiters"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }

    下面看一下run方法是怎样执行的

     public void run() {
          //runner置成当前线程
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran;
              //成功设置result失败设置Exception
    try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

    下面看看成功都做了些什么

    protected void set(V v) {
          //执行成功后状态扭转成完成中,扭转成功后将值存入outcome然后执行finishCompletion
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

    下面看看失败做了什么

        protected void setException(Throwable t) {
          //与成功类似不再多讲
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

    成功和失败都执行了finishCompletion,下面看看这个方法里干了什么

        /**
         * Removes and signals all waiting threads, invokes done(), and
         * nulls out callable.
         */

      注释已经非常清楚了。唤醒等待的节点,执行done,将callable置成null
    private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc //这里为什么能帮助gc呢,如果q在老年代,q.next在年轻代的话就可以了,详情看https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6806875 q = next; } break; } } done(); callable = null; // to reduce footprint }

    到这里run方法已经很清楚了,下面看一下get方法

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
            //很明显需要看这个方法,记住这个传参false s
    = awaitDone(false, 0L); return report(s); }
    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        //timed = false
    final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) {
            //线程已经被打断了
    if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }        int s = state;
            // 已经完成了返回状态
    if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //这里直接让出线程,让runner去赋值 Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //加入队列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //ture的话 等待一段时间。false的话直接阻塞 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }

    这里如果阻塞了,就等run方法执行完成的释放了,代码逻辑很清晰,jdk并发包中的实现用了很多for(;;)这其实是作者写C的习惯的while(true)会多一些指令,在java中编译成

    字节码这两个是完全一样的。下面看一下获取到状态后执行的report方法

      //正常直接返回结果,异常封装一下抛出,这里有个退出,退出的代码这里就不再继续分析了,看完上述的分析,相信你也能快速看懂退出的代码
    private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

    4.使用

    在实际开发中,大部分情况都要用到db,http,rpc这些IO操作,在一个方法中需要多次进行这些操作时,如果没有前后关联,可以使用Future充分

    使用多核cpu,比如你需要查多个表拼接成一个VO返回给前端,就可以用Future提高接口的响应时间。

  • 相关阅读:
    Android 目前最稳定和高效的UI适配方案
    寄Android开发Gradle你需要知道的知识
    Android精讲--界面编程5(AdapterView及其子类)
    Android精讲--界面编程4(ImageView及其子类)
    Android精讲--界面编程3(TextView及其子类)
    Android精讲--界面编程2(布局管理器)
    Android的基类Context和View
    Android里的前端界面
    Android的活动Activity
    Android基础入门
  • 原文地址:https://www.cnblogs.com/zhandouBlog/p/11336225.html
Copyright © 2011-2022 走看看