zoukankan      html  css  js  c++  java
  • Java多线程系列 JUC线程池05 线程池原理解析(四)

    转载 http://www.cnblogs.com/skywang12345/p/3544116.html  https://blog.csdn.net/programmer_at/article/details/79799267

    Executor执行Callable任务

      Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。

    1. Callable

    Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。

    Callable的源码如下:

    public interface Callable<V> {
        V call() throws Exception;
    }

    说明:从中我们可以看出Callable支持泛型。

    2. Future

    Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

    Future的源码如下:

    public interface Future<V> {
        // 试图取消对此任务的执行。
        boolean     cancel(boolean mayInterruptIfRunning)
    
        // 如果在任务正常完成前将其取消,则返回 true。
        boolean     isCancelled()
    
        // 如果任务已完成,则返回 true。
        boolean     isDone()
    
        // 如有必要,等待计算完成,然后获取其结果。
        V           get() throws InterruptedException, ExecutionException;
    
        // 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
        V             get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException;
    }

    说明: Future用于表示异步计算的结果。它的实现类是FutureTask,在讲解FutureTask之前,我们先看看Callable, Future, FutureTask它们之间的关系图,如下:

     

    说明
    (01) RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }

    (02) FutureTask实现了RunnableFuture接口。所以,我们也说它实现了Future接口。

    示例和源码分析

    我们先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。

    import java.util.concurrent.Callable;
    import java.util.concurrent.Future;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ExecutionException;
    
    class MyCallable implements Callable {
    
        @Override 
        public Integer call() throws Exception {
            int sum    = 0;
            // 执行任务
            for (int i=0; i<100; i++)
                sum += i;
            //return sum; 
            return Integer.valueOf(sum);
        } 
    }
    
    public class CallableTest1 {
    
        public static void main(String[] args) 
            throws ExecutionException, InterruptedException{
            //创建一个线程池
            ExecutorService pool = Executors.newSingleThreadExecutor();
            //创建有返回值的任务
            Callable c1 = new MyCallable();
            //执行任务并获取Future对象 
            Future f1 = pool.submit(c1);
            // 输出结果
            System.out.println(f1.get()); 
            //关闭线程池 
            pool.shutdown(); 
        }
    }

    运行结果

    4950

    结果说明
      在主线程main中,通过newSingleThreadExecutor()新建一个线程池。接着创建Callable对象c1,然后再通过pool.submit(c1)将c1提交到线程池中进行处理,并且将返回的结果保存到Future对象f1中。然后,我们通过f1.get()获取Callable中保存的结果;最后通过pool.shutdown()关闭线程池。

     

    1. submit任务,等待线程池execute 
    1. 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果; 
    2. FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程;

    在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。 
    1. Callable接口类似于Runnable,只是Runnable没有返回值。 
    2. Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果; 
    3. Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

    1. submit()

    submit()在ExecutorService.java中的定义:

    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);

    submit()在AbstractExecutorService.java中实现,AbstractExecutorService.submit()实现了ExecutorService.submit(),并且可以获取执行完的返回值, 而ThreadPoolExecutor是AbstractExecutorService.submit()的子类,所以submit方法也是ThreadPoolExecutor的方法,它的源码如下:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 创建一个RunnableFuture对象
        RunnableFuture<T> ftask = newTaskFor(task);
        // 执行“任务ftask”
        execute(ftask);
        // 返回“ftask”
        return ftask;
    }

    说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

    2. FutureTask的构造函数

    FutureTask的内部状态及构造函数如下:

    public class FutureTask<V> implements RunnableFuture<V> {
    
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            // callable是一个Callable对象
            this.callable = callable;
            // state记录FutureTask的状态
            this.state = NEW;       // ensure visibility of callable
         }
    }

    3. FutureTask的run()方法

    我们继续回到submit()的源码中。
    在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中实现,源码如下:

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // 将callable对象赋值给c。
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行Callable的call()方法,并保存结果到result中。
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // 如果运行成功,则将result保存
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            // 设置“state状态标记”
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    说明:FutureTask.run方法是在线程池中被执行的,而非主线程 
    1. 通过执行Callable任务的call方法; 
    2. 如果call执行成功,则通过set方法保存结果,之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值; 
    3. 如果call执行有异常,则通过setException保存异常;

    4. get方法

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
    } 

    内部通过awaitDone方法对主线程进行阻塞,具体实现如下:

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }

    说明:

    1. 如果主线程被中断,则抛出中断异常;
    2. 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
    3. 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
    4. 通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
    5. 最终通过LockSupport的park或parkNanos挂起线程;
  • 相关阅读:
    CentOS6+nginx+uwsgi+mysql+django1.6.6+python2.6.6
    CentOS 6.5下安装Python+Django+Nginx+uWSGI
    python学习之旅
    Gitlab安装操作说明书
    快速上手git gitlab协同合作
    在centos6.3用yum安装redis
    CentOS 6.5 下安装 Redis 2.8.7
    Ruby Gems更换淘宝源方法
    Apache 日志分析(一)
    Apache 日志分析(二)
  • 原文地址:https://www.cnblogs.com/lizhouwei/p/9119074.html
Copyright © 2011-2022 走看看