zoukankan      html  css  js  c++  java
  • FutureTask

    从前创建线程的弊端

    • 没有返回值
    • 不能捕获异常

    现在使用Callable+FutureTask既可以有返回值,也可以捕获异常

    当有了返回值后,我们就可以不用一直等着线程的结果,而是可以先干点别的事情,最后凭future获取结果,例如星期六你去蛋糕店做蛋糕,店员给你一张小票,你这时候可以先去看一部电影,看完回到蛋糕店凭小票领取蛋糕即可,这样就可以省去你等待做蛋糕花费的时间,这里的future就相当于小票

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class FutureTaskDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Callable<Integer> call = () -> {
                System.out.println("正在计算结果。。。");
                Thread.sleep(1000);
                return 1;
            };
    
            FutureTask<Integer> futureTask = new FutureTask<>(call);
            new Thread(futureTask).start();
            // 做点别的事情
            System.out.println("do something...");
    
            System.out.println("拿到的结果为 " + futureTask.get());
        }
    }
    

    源码解析

    1. 在new FutureTask的时候需要传入Callable接口,那么先去看看FutureTask的构造方法:

      public FutureTask(Callable<V> callable) {
          if (callable == null)
              throw new NullPointerException();
          this.callable = callable;    // 这里可以看到,FutureTask自己维护了一个Callable变量
          this.state = NEW;       // ensure visibility of callable
      }
      
    2. 然后我们需要将FutureTask对象传入Thread类中,那么在Thread类中的run()方法里面又发生了什么呢?

      public void run() {
          if (target != null) {
              target.run();    // 执行了target的run()方法
          }
      }
      

      其实在Thread类中的run()方法很简单,只是执行了target的run()方法,这个target就是我们在构建Thread对象时传入的Runnable对象,在这里就是FutureTask对象(FutureTask继承了Runnable)

    3. 因此直接去FutureTask的run()方法查看

      public void run() {
          if (state != NEW ||
              !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                           null, Thread.currentThread()))
              return;
          try {
              Callable<V> c = callable;   // 将自己维护的Callable对象赋值给c
              if (c != null && state == NEW) {
                  V result;
                  boolean ran;
                  try {
                      // 在这里执行了Callable的run()方法,也就是我们自己写的run()方法,并且拿到了返回结果
                      result = c.call();      
                      ran = true;
                  } catch (Throwable ex) {
                      result = null;
                      ran = false;
                      setException(ex);
                  }
                  if (ran)
                      set(result);   // 在这里将返回的结果设置供set()方法获取调用
              }
          } finally {
              // runner must be non-null until state is settled to
              // prevent concurrent calls to run()
              runner = null;
              // state must be re-read after nulling runner to prevent
              // leaked interrupts
              int s = state;
              if (s >= INTERRUPTING)
                  handlePossibleCancellationInterrupt(s);
          }
      }
      

      深入解析set()以及get()方法

      首先,需要先了解FutureTask类里面定义的几个状态,当FutureTask被new出来的时候,就会把state设置为NEW状态,表示这是新创建的任务

      private volatile int state;
      private static final int NEW          = 0;
      private static final int COMPLETING   = 1;
      private static final int NORMAL       = 2;
      private static final int EXCEPTIONAL  = 3;
      private static final int CANCELLED    = 4;
      private static final int INTERRUPTING = 5;
      private static final int INTERRUPTED  = 6;
      

      下面可以看set()跟get()方法了

      set()方法

      protected void set(V v) {
          if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  // 将状态修改为COMPLETING
              outcome = v;  // 将执行结果设置给FutureTask维护的outcome变量
              // 如果一切设置成功,就将任务状态设置为NORMAL状态,这便是已经完成了
              UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
              // 当设置成功后,就会执行这个方法,将所有先前执行了FutureTask.get()方法的线程唤醒(因为在之前代码还未执行完,没有得到结果,所以执行get()的线程都会被等待,一知道结果出来为止)
              finishCompletion();
          }
      }
      
      
      
      private void finishCompletion() {
          // 这里的WaitNode里面存储了对应的线程,这是一个链式结构,在这一步会一个一个遍历唤醒
          for (WaitNode q; (q = waiters) != null;) {
              if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                  for (;;) {
                      Thread t = q.thread;
                      if (t != null) {
                          q.thread = null;
                          LockSupport.unpark(t);   // 主要是在这一步对线程进行唤醒
                      }
                      // 获取到下一个节点
                      WaitNode next = q.next;
                      if (next == null)
                          break;
                      q.next = null;
                      q = next;
                  }
                  break;
              }
          }
          // 这个done方法并未在FutureTask中有任何有效的代码,如下。我猜测是一个钩子函数,等set执行完就会立即执行
          // 这个done可以自己实现,可以自定义一个类继承FutureTask或者用匿名内部类实现
          done();   
          callable = null;
      }
      
      protected void done() { }
      
      
      // 实现如下
      FutureTask<Integer> futureTask = new FutureTask<Integer>(call) {
          @Override
          protected void done() {
              System.out.println("=======完成了");
          }
      };
      

      get()方法

      public V get() throws InterruptedException, ExecutionException {
          int s = state;
          if (s <= COMPLETING)   // 假设状态值小于COMPLETING,也就是说任务还没有完成,就执行下面的等待方法
              s = awaitDone(false, 0L);  // 如果任务没有执行完,那么会将当前线程等待在这里
          return report(s);   // 返回处理结果,方法在下面展示
      }
      
      
      
      
      private int awaitDone(boolean timed, long nanos)
          throws InterruptedException {
          final long deadline = timed ? System.nanoTime() + nanos : 0L;
          WaitNode q = null;
          boolean queued = false;
          for (;;) {
              if (Thread.interrupted()) {
                  removeWaiter(q);   // 如果线程被中断了,就把所有的WaitNode移除
                  throw new InterruptedException();
              }
      
              int s = state;
              if (s > COMPLETING) {   // 如果任务完成,直接返回状态
                  if (q != null)
                      q.thread = null;
                  return s;
              }
              else if (s == COMPLETING)  // 如果已经完成了任务的执行,就没要继续占用cpu的执行权,让出即可
                  // yield()方法会通知线程调度器放弃对处理器的占用,但调度器可以忽视这个通知。yield()方法主要是为了保障线程间调度的连续性,防止某个线程一直长时间占用cpu资源。
                  Thread.yield();
              else if (q == null)
                  // 如果当前访问get()方法的线程还没有对应的WaitNode,就创建
                  // 而且会把当前线程放入WaitNode中
                  q = new WaitNode();  
              else if (!queued)
                  queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                       q.next = waiters, q);
              else if (timed) {  // 如果设置了超时时间,进行检测,如果超时了就一处所有的WaitNode
                  nanos = deadline - System.nanoTime();
                  if (nanos <= 0L) {
                      removeWaiter(q);
                      return state;
                  }
                  LockSupport.parkNanos(this, nanos);
              }
              else
                  // 如果任务状态小于COMPLETING,也就是还没有执行完,那么就把当前线程阻塞,这也是这个方法的关键
                  LockSupport.park(this);    
          }
      }
      
      
      private V report(int s) throws ExecutionException {
          // 这个outcome是FutureTask维护的变量,在先前的set()方法中已经将结果设置给outcome
          Object x = outcome;   
          if (s == NORMAL)
              return (V)x;  // 正常执行完,返回结果
          if (s >= CANCELLED)   // 如果非正常执行完,抛出异常
              throw new CancellationException();
          // 此处会将异常转为Throwable,是所有异常的基类
          throw new ExecutionException((Throwable)x);
      }
      
      
      
      // 这是WaitNode的实现
      static final class WaitNode {
          volatile Thread thread;
          volatile WaitNode next;
          // 每次创建,都会把当前正在执行的线程存进来
          WaitNode() { thread = Thread.currentThread(); }
      }
      

      以上就是FutureTask基本的一些api的源码解读

    Fork/Join框架

    背景

    示例

    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoinDemo extends RecursiveTask<Integer> {
        private Integer begin;
        private Integer end;
    
        public ForkJoinDemo(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            int sum = 0;
            if (end - begin <= 2) {
                // 计算
                for (int i = begin; i <= end; i++) {
                    sum += i;
                }
            } else {
                // 拆分任务
                ForkJoinDemo forkJoinDemo = new ForkJoinDemo(begin, (begin + end) / 2);
                ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo((begin + end) / 2 + 1, end);
    
                // 执行子任务
                forkJoinDemo.fork();
                forkJoinDemo2.fork();
    
                Integer join = forkJoinDemo.join();  // 获取执行结果
                Integer join1 = forkJoinDemo2.join();
                sum = join + join1;
            }
            return sum;
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Integer> future = pool.submit(new ForkJoinDemo(1, 100));
            System.out.println("。。。。");
            System.out.println("计算的结果:" + future.get());
        }
    }
    
  • 相关阅读:
    Thread中带参方法无法使用之解决方案
    项目相关的风险要素及分类
    AspNetPager分页示例之DataGrid(PostBack分页)
    Substitution 类 (asp.net 2.0 )
    自定义HTTP处理程序显示图片(asp.net 2.0)
    常见文件扩展名和它们的说明
    基于.NET的开源GIS项目(转)
    项目开发流程标准
    AOP(Aspect Oriented Programming) 面向方面编程
    项目实施及管理标准
  • 原文地址:https://www.cnblogs.com/Myarticles/p/12046072.html
Copyright © 2011-2022 走看看