zoukankan      html  css  js  c++  java
  • 这是我看过最好的一篇关于Executor线程池的解析使用

    线程池介绍

    线程池是一个线程集合,当有任务到来时线程池会为任务分配一个线程用于执行任务,如果没有任务到来线程池里面的线程就处于空闲状态;

    如果不使用线程池,当每个任务来时都会为其创建一个线程:代码如下

    public static void main(String[] args) throws IOException {
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                // 为每个请求都创建一个新的线程
                final Socket connection = socket.accept();
                Runnable task = () -> handleRequest(connection);
                new Thread(task).start();
            }
        }
        private static void handleRequest(Socket connection) {
            // ...
        }
    

    这种模式在非生产模式中完全没有问题,但缺陷非常明显

    • 线程的生命周期开销非常高: 线程的创建和销毁根据不同的平台开销不同,但每个线程都有自己的生命周期,都需要时间和系统资源,如果是大数据计算消耗的系统资源更加恐怖;
    • 内存消耗大:多个线程运行,对cpu的竞争,内存的分配都非常耗资源,如果无限制的线程系统性能将大幅度降低,得不偿失;
    • 稳定性差:当系统遭到攻击时,或者抛出异常时,系统的恢复能力差;

    Executor

    Executor介绍

    Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象,Runnable 可以表示任务,适用于生产消费者模式,当任务提交表示生产者,执行任务表示消费者;

    public interface Executor {
        void execute(Runnable command);
    }
    

    Executor 有许多实现类,子类目录如下;

    • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开
    • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
    • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执
      行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
    • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
    • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

    我们使用固定大小的线程池就可以避免无限制的创建线程,减少资源浪费;如下所示创建一个50大小的线程池,当任务来时,从线程池中拿线程执行任务;

     private static final int NUMBER = 50;
        private static ExecutorService fixedThreadPool =  Executors.newFixedThreadPool(NUMBER);
        public static void main(String[] args) throws IOException {
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                // 为每个请求都创建一个新的线程
                final Socket connection = socket.accept();
                Runnable task = () -> handleRequest(connection);
                fixedThreadPool.execute(task);
            }
        }
    
        private static void handleRequest(Socket connection) {
            // ...
        }
    

    线程池创建方式

    线程池的本质目的是代替Threan, 避免无限制创建线程,减少资源开销;可以通过Executors中的静态方法来创建线程池;

    • newFixedThreadPool创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大数量。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程;
    • newCachedThreadPool创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1);
    • newSingleThreadExecutor创建一个单线程的线程池。该线程池是单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。该线程所有任务安装顺序执行;
    • newScheduledThreadPool: 创建一个固定长度的线程池,可以延时或者定时执行;

    线程池生命周期

    executors启动的线程为非守护线程,如果没有及时关闭executors启动的线程,那么JVM将不会自动关闭;在 executors 的扩展类 ExecutorService 中定义了 executors 的生命周期,其有三种状态,当创建任务时,处于运行态,当执行 shutdown 时 处于缓慢关闭状态(不再接受提交任务,等队列中任务全部执行完再关闭线程池),当执行 shutdownNow 时 粗暴关闭(不管队列中是否存在任务,马上关闭线程池);

    源码如下

    public interface ExecutorService extends Executor {
    
        void shutdown();
    
    
        List<Runnable> shutdownNow();
    
    
        boolean isShutdown();
    
    
        boolean isTerminated();
    
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
    
        <T> Future<T> submit(Callable<T> task);
    
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    使用示例如下:

        public static void main(String[] args) {
    
            ExecutorService executor = Executors.newSingleThreadExecutor();
    
            IntStream.range(0, 5).forEach(i -> executor.execute(() -> {
                // 执行任务
                String threadName = Thread.currentThread().getName();
                System.out.println("线程名称: " + threadName);
            }));
            // 关闭线程池
            executor.shutdown();
        }
    

    在 ExecutorService中 提交执行任务有2种方式,第一种就是上面代码的excutor(Runable),执行后没有返回值,第二种就是submit(Callable),执行后返回一个Future,即执行后有返回值,表示任务执行完毕;

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future future = executorService.submit(new Callable(){
                @Override
                public Object call() throws Exception {
                    System.out.println("执行任务");
                    return "执行任务完成";
                }
            });
            executorService.shutdown();
            System.out.println(future.get());
        }
    

    FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("知识追寻者");
                    return 2;
                }
            });
            //可以作为Runnable类型对象使用
            Thread t1 = new Thread(futureTask);
            t1.start();
            while (true){
                // 任务执行完成
                if (futureTask.isDone()){
                    System.out.println(futureTask.get());
                    break;
                }
            }
        }
    

    ScheduledExecutorService

    cheduledExecutorServiceExecutorService 接口的扩展,支持延迟执行任务和定时执行任务, 并且是Timer 定时器的代替方案;

    源码如下

    public interface ScheduledExecutorService extends ExecutorService {
    
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
    
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
    
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    
    }
    
    • schedule 方法可以延时执行一个 Runnable 或者 Callable 任务。
    • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定时间间隔定期执行任务。

    使用示例

    public static void main(String[] args) {
            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
            // 延时1秒,每2秒执行一次
            executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS);
            try {
                // 保证任务都执行完毕
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                executor.shutdown();
            }
        }
    
    

    关注公众知识追寻者,获取原创PDF,面试集;

    ThreadPoolExecutor

    Executor框架最核心的类是ThreadPoolExecutor, 其继承 AbstractExecutorService , 构造方法如下,总共有4个

    public class ThreadPoolExecutor extends AbstractExecutorService {
        // ...
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                				  BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                				  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory);
     
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                				  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler);
     
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
            					  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler);
       // ...
    }
    

    核心的构造方法如下

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
    
    • corePoolSize : 线程池大小,即线程数量
    • maximumPool:最大线程池的大小;
    • BlockingQueue:用来暂时保存任务的工作队列;
    • keepAliveTime:表示线程没有任务执行时最多保持多久时间线程会终止。线程池中的线程不会立即销毁,仅当线程池中的线程数大于corePoolSize 时,多余的空闲线程会活跃keepAliveTime时间之后才销毁;
    • unitkeepAliveTime 的时间单位;
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    
    • workQueue :等待执行的任务队列
    ArrayBlockingQueue // 有界阻塞队列。先进先出队列(FIFO),必须指定大小。
    LinkedBlockingQueue // 无界阻塞队列。先进先出队列(FIFO)。如果没有指定队列大小,则默认为 int 最大值
    SynchronousQueue // 容量为0的无界队列
    PriorityBlockingQueue // 具有优先级的无界阻塞队列。
    
    • threadFactory:线程工厂,主要用来创建线程;

    handler :任务饱和时的处理策略;

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认)。 
    ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:直接调用 run 方法并且阻塞执行;
    

    使用示例

        public static void main(String[] args) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            threadPoolExecutor.execute(()->{
                System.out.println("公众号:知识追寻者, 原创PDF,系统教程知识,期待您的关注");
            });
            threadPoolExecutor.shutdown();
        }
    

    Executors引起的OOM

    在阿里巴巴规范中建议不要使用Executors的静态方法去创建线程池,原因是线程容量为 Integer.MAX_VALUE的无界队列会引起OOM;

    设置 jvm 堆内存大小

    -Xms5M -Xmx5M 
    

    测试代码

     public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i <Integer.MAX_VALUE ; i++) {
                executorService.submit(()-> {
                    // do nothing
                    System.out.println("zszxz");
                } );
            }
        }
    

    跑几十秒程序就马上出现OOM异常;

    newCachedThreadPool源码如下

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    newSingleThreadExecutor 源码如下

     public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    LinkedBlockingQueue 如果未指定大小,则默认为 Integer.MAX_VALUE,可以认为是一个无界队列,会无限制创建线程;同理 SynchronousQueue 是容量为0的无界队列,来一个任务会创建一个线程,可以创建 Integer.MAX_VALUE个线程,所以也认为 会创建无限个线程;故 推荐使用上一节的 ThreadPoolExecutor 方式创建线程池,并指定队列大小;

  • 相关阅读:
    【AGC010 C】Cleaning
    【未知来源】火神的鱼
    【2017 北京集训 String 改编版】子串
    【未知来源】记忆
    【2017 江苏集训】子串
    【未知来源】循环移位
    【未知来源】K-th String
    【hdu 6067】Big Integer
    【CERC 2014 E】2048
    【hdu 6155】Subsequence Count
  • 原文地址:https://www.cnblogs.com/zszxz/p/14176805.html
Copyright © 2011-2022 走看看