zoukankan      html  css  js  c++  java
  • Java并发之ThreadPoolExecutor源码解析(一)

    线程池

    假设我们编写了一个Servlet应用,当用户通过浏览器发起一个请求到达我们服务器时,传统的Servlet应用一般针对一个用户请求创建一个线程去执行请求,等到请求执行完毕后,再销毁线程。这种设计在用户量几百或者几千的情况下一般不会有什么大问题,但是如果我们的用户量上达几万甚至几十万几百万,频繁的创建、销毁线程,将会给服务器带来巨大的开销,甚至会出现OOM(Out Of Memory)异常。因此,为了节省资源的消耗,提高资源的利用率,引出了线程池化技术。

    线程池会维护若干线程,等待任务的到来,避免重复创建、销毁线程造成的消耗,提高任务的响应速度,不需要创建线程就可以立即执行任务,使用线程池可以进行统一的分配、调优和监控,避免无节制的创建线程,提高系统的稳定性。

    当然,线程池也并非十全十美,它也有力不从心的场景,例如:线程池适合生命周期较短的任务,不适合耗时较长的任务;线程池无法设置任务的优先级,也无法单独启动或者终止某个线程。

    现在,我们对比一下线程池执行任务和创建线程执行任务的优势。在ThreadTest中,我们声明了一个线程安全的list,并创建10000个线程并发往list添加随机值,最后我们等待所有线程执行完毕,打印程序的执行时间和list的大小。

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Random;
    
    public class ThreadTest {
    
        public static void main(String[] args) throws InterruptedException {
            Long start = System.currentTimeMillis();
            final Random random = new Random();
            final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
            final List<Thread> threads = new ArrayList<>();
            for (int i = 0; i < 10000; i++) {
                Thread thread = new Thread(() -> list.add(random.nextInt()));
                thread.start();
                threads.add(thread);
            }
            for (Thread thread : threads) {
                thread.join();
            }
            System.out.println("时间:" + (System.currentTimeMillis() - start));
            System.out.println("大小:" + list.size());
        }
    }
    

        

    执行结果:

    时间:882
    大小:10000
    

      

    可以看到list的长度为10000,程序执行了882毫秒。

    下面,我们用线程池的方式来执行相同的逻辑,我们声明一个线程池executorService,并往线程池中提交10000个任务,每个任务都向list添加一个随机值:

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolTest {
    
        public static void main(String[] args) throws InterruptedException {
            Long start = System.currentTimeMillis();
            final Random random = new Random();
            final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10000; i++) {
                executorService.execute(() -> list.add(random.nextInt()));
            }
            executorService.shutdown();
            executorService.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("时间:" + (System.currentTimeMillis() - start));
            System.out.println("大小:" + list.size());
        }
    }
    

      

    执行结果:

    时间:52
    大小:10000
    

      

    可以看到线程池的执行时间相比创建线程,大大缩短。

    下面,我们就从源码的角度,来剖析线程池的工作原理。ThreadPoolExecutor是java.util.concurrent包下提供的线程池实现类,下图是ThreadPoolExecutor类的继承关系,我们将从上至下逐个分析ThreadPoolExecutor的父类:Executor、ExecutorService、AbstractExecutorService。

    Executor

    我们先来看下Executor接口定义:

    public interface Executor {
        void execute(Runnable command);
    }
    

      

    Executor允许我们提交若干待执行的任务,我们不再像以前一样用new Thread(new RunnableTask()).start()的方式启动一个线程去执行RunnableTask的run()方法,取而代之的是用Executor的实现类去执行,比如:

     Executor executor = anExecutor;
     executor.execute(new RunnableTask1());
     executor.execute(new RunnableTask2());
     ...
    

      

    Executor提供了一种新的方式,我们只需提交任务,Executor自身负责如何调度线程来执行任务。Executor并不要求任务的执行必须是异步的,也可以在提交完任务后,同步执行任务:

     class DirectExecutor implements Executor {
         public void execute(Runnable r) {
             r.run();
         }
     }
    

      

    通常情况下,Executor会将提交过来的任务放在另一个线程执行,而不是通过调用线程来执行:

     class ThreadPerTaskExecutor implements Executor {
         public void execute(Runnable r) {
             new Thread(r).start();
         }
     }
    

      

    一些Executor接口的实现在调度线程执行任务时会添加一些限制,比如我们可以以代理模式的思想来封装Executor:

     class SerialExecutor implements Executor {
       final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
       final Executor executor;
       Runnable active;
    
       SerialExecutor(Executor executor) { 
         this.executor = executor;
       }
    
       public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
           public void run() {
             try {
               r.run();
             } finally {
               scheduleNext();
             }
           }
         });
         if (active == null) { 
           scheduleNext();
         }
       }
    
       protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) { 
           executor.execute(active);
         }
       }
     }
    

      

    在创建SerialExecutor对象时,会要求传入一个executor对象,执行任务并不是SerialExecutor对象本身,SerialExecutor并不执行任务,只是将任务缓存到队列tasks,而executor才是真正负责执行任务。

    ExecutorService

    ExecutorService扩展了Executor,ExecutorService不但具备Executor执行任务的能力,我们还可以关闭ExecutorService,这将使ExecutorService拒绝接受新提交的任务。ExecutorService.submit(...)方法是基于Executor.execute(Runnable command)封装的,execute方法没有任何返回,而submit会返回一个Future对象,通过Future对象我们可以取消任务或者等待任务未来的执行结果。

    public interface ExecutorService extends Executor {
    
        /**
         * 关闭线程池,调用此方法后不再接受新任务,但会处理线程池内
         * 尚未完成的任务,如果线程池已经关闭,再次调用此方法将无事发生。
         * 这个方法不会等待已提交但尚未完成的任务执行完毕,需要调用awaitTermination(long timeout, TimeUnit unit)
         * 来等待。
         */
        void shutdown();
    
        /**
         * 调用此方法会尝试停止所有正在运行的线程,比如:调用Thread.interrupt()
         * 标记线程已中断,如果任务没有响应中断则线程无法停止。这个方法会返回尚未
         * 执行的任务列表,它不会等待正在执行的任务执行完毕,需要调用awaitTermination(long timeout, TimeUnit unit)
         * 来等待。
         */
        List<Runnable> shutdownNow();
    
        /**
         * 判断线程池是否已关闭,true为关闭。
         */
        boolean isShutdown();
    
        /**
         * 如果调用shutdown()或者shutdownNow()后,所有任务都已完成,则返回true
         */
        boolean isTerminated();
    
        /**
         * 使当前调用此方法线程陷入阻塞,直到:
         *      1.调用停止线程池方法后,完成所有任务。
         *      2.阻塞超时。
         *      3.调用此方法线程被中断。
         *
         * @param timeout 最大等待时长
         * @param unit    时长单位
         * @return 如果任务都执行完毕返回true,如果超时或中断则返回false
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException;
    
        /**
         * 提交一个有返回值的任务后,将返回一个Future对象代表任务的运行结果,
         * 可以调用Future.get()获得任务的执行结果,如果提交任务后想立即获得结果,
         * 可以用:result = exec.submit(aCallable).get();这样的方式获得,
         * 调用线程将陷入阻塞,直到线程池执行完任务,执行结果被放入到Future对象。
         *
         * @param task 待执行的任务。
         * @param <T>  任务执行结果的类型。
         * @return 返回Future对象,线程池执行完毕任务后,执行结果会被放入到Future对象。
         */
        <T> Future<T> submit(Callable<T> task);
    
        /**
         * 提交一个可执行的任务和给定的执行结果,并返回一个Future对象代表该任务将来的运行结果,
         * 如果任务成功执行,Future对象将返回我们给定的执行结果。
         *
         * @param task   待执行的任务
         * @param result 任务执行完毕后的返回结果
         * @param <T>    返回结果的类型
         * @return 返回Future对象,线程池执行完毕任务后,执行结果会被放入到Future对象。
         */
        <T> Future<T> submit(Runnable task, T result);
    
        /**
         * 提交待执行的任务并返回Future对象代表该任务未来的运行结果,如果任务执行成功,
         * 调用Future.get()将返回null。
         *
         * @param task 待执行的任务
         * @return 返回Future对象,代表该任务未来的运行结果。
         */
        Future<?> submit(Runnable task);
    }
    

      

    我们可以用ExecutorService来模拟一个网络服务,用Executors.newFixedThreadPool(int)工厂方法生成的线程池中的线程来处理传入的网络请求:

     class NetworkService implements Runnable {
       private final ServerSocket serverSocket;
       private final ExecutorService pool;
    
       public NetworkService(int port, int poolSize)
           throws IOException {
         serverSocket = new ServerSocket(port);
         pool = Executors.newFixedThreadPool(poolSize);
       }
    
       public void run() { // run the service
         try {
           for (;;) {
             pool.execute(new Handler(serverSocket.accept()));
           }
         } catch (IOException ex) {
           pool.shutdown();
         }
       }
     }
    
     class Handler implements Runnable {
       private final Socket socket;
       Handler(Socket socket) { this.socket = socket; }
       public void run() {
         // read and service request on socket
       }
     }
    

      

    关闭ExecutorService分两个阶段,首先调用shutdown()拒绝再有新的任务提交,然后在必要的时候调用shutdownNow(),尝试中断正在执行的任务。

     void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
         // Wait a while for existing tasks to terminate
         if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
               System.err.println("Pool did not terminate");
         }
       } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
       }
     }
    

      

    AbstractExecutorService

    AbstractExecutorService是juc(java.util.concurrent)包下提供的ExecutorService接口的默认实现,在newTaskFor方法中返回FutureTask作为RunnableFuture接口的实现。可以看到不论是Runnable还是Callable类型的任务,都会被封装成RunnableFuture类型的任务来执行。

    public abstract class AbstractExecutorService implements ExecutorService {
    
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    	
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    	
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    	
    }
    

      

  • 相关阅读:
    Scala (三)集合
    为什么成为一名程序员?
    Hadoop——Yarn
    Redis(一)NoSQL简介、Redis安装 、数据类型、配置文件、发布订阅
    Java并发编程——共享模型之内存( JMM、原子性、可见性、有序性、volatile原理)
    KafkaAPI实战案例
    分布式技术原理笔记(二)分布式体系结构
    Flume 进阶
    Kafka框架基础
    Scala (二)面向对象
  • 原文地址:https://www.cnblogs.com/beiluowuzheng/p/14393058.html
Copyright © 2011-2022 走看看