zoukankan      html  css  js  c++  java
  • java中多线程之Future入门

    前言

    Future可以看做一个异步的计算结果的票据,类似我们排队过程中获取的号,后面根据这个号去操作。

    简单使用

    Future需要配合Callable接口和线程池使用

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    Callable就是有返回值的Runnable。

    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class Client {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<Integer> future = executorService.submit(() -> {
          System.out.println("start execute");
          try {
            Thread.sleep(2_000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("end execute");
          return 5;
        });
        executorService.shutdown();
        System.out.println(future.get());
      }
    
    }
    

    创建一个线程池,提交一个Callable,返回一个Future,get()方法会使当前线程阻塞,直到Callable执行结束。

    原理分析

    public abstract interface Future<V> {
     public abstract boolean cancel(boolean paramBoolean);
     public abstract boolean isCancelled();
     public abstract boolean isDone();
     public abstract V get() throws InterruptedException, ExecutionException;
     public abstract V get(long paramLong, TimeUnit paramTimeUnit)
     throws InterruptedException, ExecutionException, TimeoutException; 
    }
    

    Future在等待结果过程中可以取消,但不一定取消成功,因为任务可能已经完成。
    java提供了一个Future的实现类FutureTask

    /**
     *
     * state可能的状态转变路径如下:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;
    

    内部维护一个运行状态,刚开始为NEW,正常结束为NORMAL,异常结束为EXCEPTIONAL

    /** The underlying callable; nulled out after running */
        private Callable<V> callable;
        /** The result to return or exception to throw from get() */
        private Object outcome; // non-volatile, protected by state reads/writes
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters;
    

    使用一个Treiber栈来存储多个阻塞线程,可以简单看做一个线程安全且高效的栈。

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
    /**
         * Awaits completion or aborts on interrupt or timeout.
         *
         * @param timed true if use timed waits
         * @param nanos time to wait, if timed
         * @return state upon completion or at timeout
         */
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            long startTime = 0L;    // Special value 0L means not yet parked
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING)
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    Thread.yield();
                else if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                else if (q == null) {
                    if (timed && nanos <= 0L)
                        return s;
                    q = new WaitNode();
                }
                else if (!queued)
                    queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
                else if (timed) {
                    final long parkNanos;
                    if (startTime == 0L) { // first time
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else {
                        long elapsed = System.nanoTime() - startTime;
                        if (elapsed >= nanos) {
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed;
                    }
                    // nanoTime may be slow; recheck before parking
                    if (state < COMPLETING)
                        LockSupport.parkNanos(this, parkNanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    

    调用get方法时,如果任务未完成,将当前线程加入等待队列并阻塞。使用CAS无锁技术来修改内部状态和等待队列。更多原理请参考 FutureTask源码分析

    参考

    FutureTask源码分析
    FutureTask中Treiber堆的实现
    理解与使用Treiber Stack

  • 相关阅读:
    where T: class的解释
    调用钉钉的WebAPI接口实现与ERP数据的同步
    Json序列化和反序列化的方式
    Log4Net日志处理
    MVC项目中异常处理
    FindBI商业智能报表工具
    权限列表实现
    委托,匿名,lambda
    [经典贪心算法]贪心算法概述
    [zt]手把手教你写对拍程序(PASCAL)
  • 原文地址:https://www.cnblogs.com/strongmore/p/14966439.html
Copyright © 2011-2022 走看看