zoukankan      html  css  js  c++  java
  • java:ExecutorService与Executors例子的简单剖析

     

            对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。 

            看一个简单的例子: 

    1. public class CacheThreadPool {  
    2.     public static void main(String[] args) {  
    3.         ExecutorService exec=Executors.newCachedThreadPool();  
    4.         for(int i=0;i<5;i++)  
    5.             exec.execute(new LiftOff());  
    6.         exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  
    7.     }  
    8. }  


            这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。 

            一、ExecutorService 
            先看看ExecutorService,这是一个接口,简单的列一下这个接口: 

    1. public interface ExecutorService extends Executor {  
    2.   
    3.     void shutdown();  
    4.   
    5.     List<Runnable> shutdownNow();  
    6.   
    7.     boolean isShutdown();  
    8.   
    9.     boolean isTerminated();  
    10.   
    11.     boolean awaitTermination(long timeout, TimeUnit unit)  
    12.   
    13.     <T> Future<T> submit(Callable<T> task);  
    14.   
    15.     <T> Future<T> submit(Runnable task, T result);  
    16.   
    17.     Future<?> submit(Runnable task);  
    18.   
    19.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
    20.   
    21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  
    22.   
    23.     <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
    24.   
    25.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
    26.                     long timeout, TimeUnit unit)  
    27. }  


            ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法: 

    1. void execute(Runnable command)  



            二、Executors 
            Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用: 
           

                Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods: 

     

             
    • Methods that create and return an ExecutorService set up with commonly useful configuration settings.       
    • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.       
    • Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.       
    • Methods that create and return a ThreadFactory that sets newly created threads to a known state.       
    • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.       


            在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法: 

    1. public static ExecutorService newCachedThreadPool() {  
    2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
    3.                                       60L, TimeUnit.SECONDS,  
    4.                                       new SynchronousQueue<Runnable>());  
    5.     }  


            在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到  
    4.     ..........  
    5. }  


            ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。 

            线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     public void execute(Runnable command) {  
    4.         if (command == null)  
    5.             throw new NullPointerException();  
    6.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
    7.             if (runState == RUNNING && workQueue.offer(command)) {  
    8.                 if (runState != RUNNING || poolSize == 0)  
    9.                     ensureQueuedTaskHandled(command);  
    10.             }  
    11.             else if (!addIfUnderMaximumPoolSize(command))  
    12.                 reject(command); // is shutdown or saturated  
    13.         }  
    14.     }  
    15.     ..........  
    16. }  


            根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是: 
            1、workQueue.offer(command) 
            workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。 
            2、ensureQueuedTaskHandled(command) 
            这个是线程执行的关键语句。看看它的源码: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private void ensureQueuedTaskHandled(Runnable command) {  
    4.         final ReentrantLock mainLock = this.mainLock;  
    5.         mainLock.lock();  
    6.         boolean reject = false;  
    7.         Thread t = null;  
    8.         try {  
    9.             int state = runState;  
    10.             if (state != RUNNING && workQueue.remove(command))  
    11.                 reject = true;  
    12.             else if (state < STOP &&  
    13.                      poolSize < Math.max(corePoolSize, 1) &&  
    14.                      !workQueue.isEmpty())  
    15.                 t = addThread(null);  
    16.         } finally {  
    17.             mainLock.unlock();  
    18.         }  
    19.         if (reject)  
    20.             reject(command);  
    21.         else if (t != null)  
    22.             t.start();  
    23.     }  
    24.     ..........  
    25. }  


            在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private Thread addThread(Runnable firstTask) {  
    4.         Worker w = new Worker(firstTask);  
    5.         Thread t = threadFactory.newThread(w);  
    6.         if (t != null) {  
    7.             w.thread = t;  
    8.             workers.add(w);  
    9.             int nt = ++poolSize;  
    10.             if (nt > largestPoolSize)  
    11.                 largestPoolSize = nt;  
    12.         }  
    13.         return t;  
    14.     }  
    15.     ..........  
    16. }  


            这里两个重点,很明显: 
            1、Worker w = new Worker(firstTask) 
            2、Thread t = threadFactory.newThread(w) 
            先看Worker是个什么结构: 

    1. public class ThreadPoolExecutor extends AbstractExecutorService {  
    2.     ..........  
    3.     private final class Worker implements Runnable {  
    4.         ..........  
    5.         Worker(Runnable firstTask) {  
    6.             this.firstTask = firstTask;  
    7.         }  
    8.   
    9.         private Runnable firstTask;  
    10.         ..........  
    11.   
    12.         public void run() {  
    13.             try {  
    14.                 Runnable task = firstTask;  
    15.                 firstTask = null;  
    16.                 while (task != null || (task = getTask()) != null) {  
    17.                     runTask(task);  
    18.                     task = null;  
    19.                 }  
    20.             } finally {  
    21.                 workerDone(this);  
    22.             }  
    23.         }  
    24.     }  
    25.   
    26.     Runnable getTask() {  
    27.         for (;;) {  
    28.             try {  
    29.                 int state = runState;  
    30.                 if (state > SHUTDOWN)  
    31.                     return null;  
    32.                 Runnable r;  
    33.                 if (state == SHUTDOWN)  // Help drain queue  
    34.                     r = workQueue.poll();  
    35.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
    36.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
    37.                 else  
    38.                     r = workQueue.take();  
    39.                 if (r != null)  
    40.                     return r;  
    41.                 if (workerCanExit()) {  
    42.                     if (runState >= SHUTDOWN) // Wake up others  
    43.                         interruptIdleWorkers();  
    44.                     return null;  
    45.                 }  
    46.                 // Else retry  
    47.             } catch (InterruptedException ie) {  
    48.                 // On interruption, re-check runState  
    49.             }  
    50.         }  
    51.     }  
    52.     }  
    53.     ..........  
    54. }  


            Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
            在看看newThread是一个什么方法: 

    1. public class Executors {  
    2.     ..........  
    3.     static class DefaultThreadFactory implements ThreadFactory {  
    4.         ..........  
    5.         public Thread newThread(Runnable r) {  
    6.             Thread t = new Thread(group, r,  
    7.                                   namePrefix + threadNumber.getAndIncrement(),  
    8.                                   0);  
    9.             if (t.isDaemon())  
    10.                 t.setDaemon(false);  
    11.             if (t.getPriority() != Thread.NORM_PRIORITY)  
    12.                 t.setPriority(Thread.NORM_PRIORITY);  
    13.             return t;  
    14.         }  
    15.         ..........  
    16.     }  
    17.     ..........  
    18. }  


            通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。 

            之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。 
            从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

  • 相关阅读:
    ACM的算法分类 2015-04-16 14:25 22人阅读 评论(0) 收藏
    初学Larevel 2014-08-21 11:24 90人阅读 评论(0) 收藏
    初学PHP&MySQL 2014-05-31 12:40 92人阅读 评论(0) 收藏
    codeforces 570 E. Pig and Palindromes (dp)
    codeforces 570 D. Tree Requests (dfs序)
    poj 2157 Maze (bfs)
    cf 570 C. Replacement (暴力)
    cf 570B B. Simple Game(构造)
    cf 570 A. Elections
    hdu 1429胜利大逃亡(续) (bfs+状态压缩)
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13318138.html
Copyright © 2011-2022 走看看