zoukankan      html  css  js  c++  java
  • ExecutorService与Executors例子的简单剖析

    对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。

    看一个简单的例子:
    Java代码收藏代码
    1. public class CacheThreadPool {
    2. public staticvoid main(String[] args) {
    3. ExecutorService exec=Executors.newCachedThreadPool();
    4. for(int i=0;i<5;i++)
    5. exec.execute(new LiftOff());
    6. exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务
    7. }
    8. }


    这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。

    一、ExecutorService
    先看看ExecutorService,这是一个接口,简单的列一下这个接口:

    Java代码收藏代码
    1. public interface ExecutorServiceextends Executor {
    2. void shutdown();
    3. List<Runnable> shutdownNow();
    4. boolean isShutdown();
    5. boolean isTerminated();
    6. boolean awaitTermination(long timeout, TimeUnit unit)
    7. <T> Future<T> submit(Callable<T> task);
    8. <T> Future<T> submit(Runnable task, T result);
    9. Future<?> submit(Runnable task);
    10. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    11. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    12. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    13. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    14. long timeout, TimeUnit unit)
    15. }


    ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法:

    Java代码收藏代码
    1. void execute(Runnable command)



    二、Executors
    Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用:

        Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:

    • Methods that create and return an ExecutorService set up with commonly useful configuration settings.
    • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
    • Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
    • Methods that create and return a ThreadFactory that sets newly created threads to a known state.
    • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.


    在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法:

    Java代码收藏代码
    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }


    在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息:

    Java代码收藏代码
    1. public class ThreadPoolExecutorextends AbstractExecutorService {
    2. ..........
    3. private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到
    4. ..........
    5. }


    ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。

    线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码:

    Java代码收藏代码
    1. public class ThreadPoolExecutorextends AbstractExecutorService {
    2. ..........
    3. public void execute(Runnable command) {
    4. if (command == null)
    5. throw new NullPointerException();
    6. if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
    7. if (runState == RUNNING && workQueue.offer(command)) {
    8. if (runState != RUNNING || poolSize == 0)
    9. ensureQueuedTaskHandled(command);
    10. }
    11. else if (!addIfUnderMaximumPoolSize(command))
    12. reject(command); // is shutdown or saturated
    13. }
    14. }
    15. ..........
    16. }


    根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是:
    1、workQueue.offer(command)
    workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。
    2、ensureQueuedTaskHandled(command)
    这个是线程执行的关键语句。看看它的源码:

    Java代码收藏代码
    1. public class ThreadPoolExecutorextends AbstractExecutorService {
    2. ..........
    3. private void ensureQueuedTaskHandled(Runnable command) {
    4. final ReentrantLock mainLock = this.mainLock;
    5. mainLock.lock();
    6. boolean reject = false;
    7. Thread t = null;
    8. try {
    9. int state = runState;
    10. if (state != RUNNING && workQueue.remove(command))
    11. reject = true;
    12. else if (state < STOP &&
    13. poolSize < Math.max(corePoolSize, 1) &&
    14. !workQueue.isEmpty())
    15. t = addThread(null);
    16. } finally {
    17. mainLock.unlock();
    18. }
    19. if (reject)
    20. reject(command);
    21. else if (t !=null)
    22. t.start();
    23. }
    24. ..........
    25. }


    在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码:

    Java代码收藏代码
    1. public class ThreadPoolExecutorextends AbstractExecutorService {
    2. ..........
    3. private Thread addThread(Runnable firstTask) {
    4. Worker w = new Worker(firstTask);
    5. Thread t = threadFactory.newThread(w);
    6. if (t != null) {
    7. w.thread = t;
    8. workers.add(w);
    9. int nt = ++poolSize;
    10. if (nt > largestPoolSize)
    11. largestPoolSize = nt;
    12. }
    13. return t;
    14. }
    15. ..........
    16. }


    这里两个重点,很明显:
    1、Worker w = new Worker(firstTask)
    2、Thread t = threadFactory.newThread(w)
    先看Worker是个什么结构:

    Java代码收藏代码
    1. public class ThreadPoolExecutorextends AbstractExecutorService {
    2. ..........
    3. private finalclass Worker implements Runnable {
    4. ..........
    5. Worker(Runnable firstTask) {
    6. this.firstTask = firstTask;
    7. }
    8. private Runnable firstTask;
    9. ..........
    10. public void run() {
    11. try {
    12. Runnable task = firstTask;
    13. firstTask = null;
    14. while (task != null || (task = getTask()) !=null) {
    15. runTask(task);
    16. task = null;
    17. }
    18. } finally {
    19. workerDone(this);
    20. }
    21. }
    22. }
    23. Runnable getTask() {
    24. for (;;) {
    25. try {
    26. int state = runState;
    27. if (state > SHUTDOWN)
    28. return null;
    29. Runnable r;
    30. if (state == SHUTDOWN) // Help drain queue
    31. r = workQueue.poll();
    32. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
    33. r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
    34. else
    35. r = workQueue.take();
    36. if (r != null)
    37. return r;
    38. if (workerCanExit()) {
    39. if (runState >= SHUTDOWN) // Wake up others
    40. interruptIdleWorkers();
    41. return null;
    42. }
    43. // Else retry
    44. } catch (InterruptedException ie) {
    45. // On interruption, re-check runState
    46. }
    47. }
    48. }
    49. }
    50. ..........
    51. }


    Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
    在看看newThread是一个什么方法:

    Java代码收藏代码
    1. public class Executors {
    2. ..........
    3. static class DefaultThreadFactoryimplements ThreadFactory {
    4. ..........
    5. public Thread newThread(Runnable r) {
    6. Thread t = new Thread(group, r,
    7. namePrefix + threadNumber.getAndIncrement(),
    8. 0);
    9. if (t.isDaemon())
    10. t.setDaemon(false);
    11. if (t.getPriority() != Thread.NORM_PRIORITY)
    12. t.setPriority(Thread.NORM_PRIORITY);
    13. return t;
    14. }
    15. ..........
    16. }
    17. ..........
    18. }


    通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。

    之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。
    从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

  • 相关阅读:
    KafkaSpout 重复消费问题解决
    FastJson 输出值 首字母大小写问题
    Kafka0.7运行时报错 kafka/javaapi/consumer/ConsumerConnector : Unsupported major.minor version 51.0 解决
    Zookeeper原理与Curator使用
    Strom 消息处理机制 中英对照翻译 (Storm如何保证消息被完全处理)
    Mac安装 Storm 小结
    linux下实现ftp上传文件
    Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.apache.hadoop.hbase.client.Result
    Spark操作HBase
    maven-pom-project文件报错
  • 原文地址:https://www.cnblogs.com/yangkai-cn/p/4017242.html
Copyright © 2011-2022 走看看