zoukankan      html  css  js  c++  java
  • java:ExecutorService与Executors例子的简单剖析

     

            对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。 

            看一个简单的例子: 

    1. public class CacheThreadPool {  
    2.     public static void main(String[] args) {  
    3.         ExecutorService exec=Executors.newCachedThreadPool();  
    4.         for(int i=0;i<5;i++)  
    5.             exec.execute(new LiftOff());  
    6.         exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  
    7.     }  
    8. }  


            这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。 

            一、ExecutorService 
            先看看ExecutorService,这是一个接口,简单的列一下这个接口: 

    1. public interface ExecutorService extends Executor {  
    2.   
    3.     void shutdown();  
    4.   
    5.     List<Runnable> shutdownNow();  
    6.   
    7.     boolean isShutdown();  
    8.   
    9.     boolean isTerminated();  
    10.   
    11.     boolean awaitTermination(long timeout, TimeUnit unit)  
    12.   
    13.     <T> Future<T> submit(Callable<T> task);  
    14.   
    15.     <T> Future<T> submit(Runnable task, T result);  
    16.   
    17.     Future<?> submit(Runnable task);  
    18.   
    19.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
    20.   
    21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  
    22.   
    23.     <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
    24.   
    25.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
    26.                     long timeout, TimeUnit unit)  
    27. }  


            ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法: 

    1. void execute(Runnable command)  



            二、Executors 
            Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用: 
           

                Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods: 

     

             
    • Methods that create and return an ExecutorService set up with commonly useful configuration settings.       
    • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.       
    • Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.       
    • Methods that create and return a ThreadFactory that sets newly created threads to a known state.       
    • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.       


            在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法: 

    1. public static ExecutorService newCachedThreadPool() {  
    2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
    3.                                       60L, TimeUnit.SECONDS,  
    4.                                       new SynchronousQueue<Runnable>());  
    5.     }  


            在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到  
    4.     ..........  
    5. }  


            ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。 

            线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     public void execute(Runnable command) {  
    4.         if (command == null)  
    5.             throw new NullPointerException();  
    6.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
    7.             if (runState == RUNNING && workQueue.offer(command)) {  
    8.                 if (runState != RUNNING || poolSize == 0)  
    9.                     ensureQueuedTaskHandled(command);  
    10.             }  
    11.             else if (!addIfUnderMaximumPoolSize(command))  
    12.                 reject(command); // is shutdown or saturated  
    13.         }  
    14.     }  
    15.     ..........  
    16. }  


            根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是: 
            1、workQueue.offer(command) 
            workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。 
            2、ensureQueuedTaskHandled(command) 
            这个是线程执行的关键语句。看看它的源码: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private void ensureQueuedTaskHandled(Runnable command) {  
    4.         final ReentrantLock mainLock = this.mainLock;  
    5.         mainLock.lock();  
    6.         boolean reject = false;  
    7.         Thread t = null;  
    8.         try {  
    9.             int state = runState;  
    10.             if (state != RUNNING && workQueue.remove(command))  
    11.                 reject = true;  
    12.             else if (state < STOP &&  
    13.                      poolSize < Math.max(corePoolSize, 1) &&  
    14.                      !workQueue.isEmpty())  
    15.                 t = addThread(null);  
    16.         } finally {  
    17.             mainLock.unlock();  
    18.         }  
    19.         if (reject)  
    20.             reject(command);  
    21.         else if (t != null)  
    22.             t.start();  
    23.     }  
    24.     ..........  
    25. }  


            在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private Thread addThread(Runnable firstTask) {  
    4.         Worker w = new Worker(firstTask);  
    5.         Thread t = threadFactory.newThread(w);  
    6.         if (t != null) {  
    7.             w.thread = t;  
    8.             workers.add(w);  
    9.             int nt = ++poolSize;  
    10.             if (nt > largestPoolSize)  
    11.                 largestPoolSize = nt;  
    12.         }  
    13.         return t;  
    14.     }  
    15.     ..........  
    16. }  


            这里两个重点,很明显: 
            1、Worker w = new Worker(firstTask) 
            2、Thread t = threadFactory.newThread(w) 
            先看Worker是个什么结构: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private final class Worker implements Runnable {  
    4.         ..........  
    5.         Worker(Runnable firstTask) {  
    6.             this.firstTask = firstTask;  
    7.         }  
    8.   
    9.         private Runnable firstTask;  
    10.         ..........  
    11.   
    12.         public void run() {  
    13.             try {  
    14.                 Runnable task = firstTask;  
    15.                 firstTask = null;  
    16.                 while (task != null || (task = getTask()) != null) {  
    17.                     runTask(task);  
    18.                     task = null;  
    19.                 }  
    20.             } finally {  
    21.                 workerDone(this);  
    22.             }  
    23.         }  
    24.     }  
    25.   
    26.     Runnable getTask() {  
    27.         for (;;) {  
    28.             try {  
    29.                 int state = runState;  
    30.                 if (state > SHUTDOWN)  
    31.                     return null;  
    32.                 Runnable r;  
    33.                 if (state == SHUTDOWN)  // Help drain queue  
    34.                     r = workQueue.poll();  
    35.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
    36.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
    37.                 else  
    38.                     r = workQueue.take();  
    39.                 if (r != null)  
    40.                     return r;  
    41.                 if (workerCanExit()) {  
    42.                     if (runState >= SHUTDOWN) // Wake up others  
    43.                         interruptIdleWorkers();  
    44.                     return null;  
    45.                 }  
    46.                 // Else retry  
    47.             } catch (InterruptedException ie) {  
    48.                 // On interruption, re-check runState  
    49.             }  
    50.         }  
    51.     }  
    52.     }  
    53.     ..........  
    54. }  


            Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
            在看看newThread是一个什么方法: 

    1. public class Executors {  
    2.     ..........  
    3.     static class DefaultThreadFactory implements ThreadFactory {  
    4.         ..........  
    5.         public Thread newThread(Runnable r) {  
    6.             Thread t = new Thread(group, r,  
    7.                                   namePrefix + threadNumber.getAndIncrement(),  
    8.                                   0);  
    9.             if (t.isDaemon())  
    10.                 t.setDaemon(false);  
    11.             if (t.getPriority() != Thread.NORM_PRIORITY)  
    12.                 t.setPriority(Thread.NORM_PRIORITY);  
    13.             return t;  
    14.         }  
    15.         ..........  
    16.     }  
    17.     ..........  
    18. }  


            通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。 

            之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。 
            从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

  • 相关阅读:
    【BIEE】清除缓存
    【BIEE】安装好BIEE后,修改默认登录页面不为QuickStart页面
    【BIRT】修改BIRT的背景颜色
    【Oracle】查看死锁与解除死锁
    【Oracle】查看正在运行的存储过程
    【Oracle】表空间相关集合
    【Oracle】Oracle常用语句集合
    MapWindowPoints
    模拟鼠标操作
    SVN MERGE 方法(原创)
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13318138.html
Copyright © 2011-2022 走看看