zoukankan      html  css  js  c++  java
  • 线程池使用的N种姿势

    线程池在开发中一定会用到,如果能像golang一样,java语言也有协程,也许java程序员就少了一种包袱。

    回归正题,我们聊下到底有哪些线程池的使用方式,总结有以下几种。

    1. JDK 内置线程池
    2. Spring线程池
    3. 自己魔改封装

    1、JDK 内置线程池

    常用的有:

    我们看下最全的线程池参数,探究为什么阿里规约不建议使用Executors创建默认个数的线程池。

    /**
    参数【7个】
    corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut
    maximumPoolSize - 池中允许的最大线程数
    keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。
    unit - keepAliveTime参数的时间单位
    workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。
    threadFactory - 执行程序创建新线程时使用的工厂
    handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量
    
    四个预定义的处理程序策略
    在默认ThreadPoolExecutor.AbortPolicy ,处理程序会引发运行RejectedExecutionException后排斥反应。
    在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行任务。 这提供了一个简单的反馈控制机制,将降低新任务提交的速度。
    在ThreadPoolExecutor.DiscardPolicy中 ,简单地删除无法执行的任务。
    在ThreadPoolExecutor.DiscardOldestPolicy中 ,如果执行程序没有关闭,则工作队列头部的任务被删除,然后重试执行(可能会再次失败,导致重复)。
    
    **/
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    

    1.1 阿里规约原文

    线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    说明:Executors返回的线程池对象的弊端如下:

    • 1)FixedThreadPool和SingleThreadPool:

      允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

    • 2)CachedThreadPool:

      允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    1.2 阿里规约是否是多余的?不然jdk为啥提供这个api?

    答案可以看下ThreadPoolExecutor类的上的注释,已翻译成中文:

    为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors工厂方法Executors.newCachedThreadPool() (无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int) (固定大小的线程池)和Executors.newSingleThreadExecutor() (单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南...
    

    其实这个类的作者"Doug Lea",已经说明Executors.newCachedThreadPool(),Executors.newFixedThreadPool(int),应该是在什么情况下使用?
    只是一般程序员是不会去仔细看类说明文档,都是跟着百度复制过来的代码就干起活来了,
    说起来也惭愧之前在2010年培训的时候,其实是会看这个jdk api文档的,但那个时候似乎很懵,也不会去详尽的看文档背后的一些知识点。
    所以说看第一手材料多么重要,作者已经把该注意的点都写到类说明文档注释里了,吓得我赶紧用到一个类都去细看下文档注释,解决不了再找度娘。

    1.3 线程池不为人知的几个冷门知识点

    从几个面试题来一一道来。

    1.核心和最大线程池是否都可以动态修改?

    可以使用setCorePoolSize(int)和setMaximumPoolSize(int)进行动态 更改

    2.核心线程最初创建并且只有在新任务到达时才启动?

    可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态地覆盖。如果您使用非空队列构建池,则可能需要预先提供线程。

    3.核心线程是否一定不终止?

    不一定,报错或者allowCoreThreadTimeOut(boolean)也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为零

    4.如何给线程池添加一个启动/暂停的功能?

    class PausableThreadPoolExecutor extends ThreadPoolExecutor {              
      private boolean isPaused;                                                
      private ReentrantLock pauseLock = new ReentrantLock();                   
      private Condition unpaused = pauseLock.newCondition();                   
                                                                               
      public PausableThreadPoolExecutor(...) { super(...); }                   
                                                                               
      protected void beforeExecute(Thread t, Runnable r) {                     
        super.beforeExecute(t, r);                                             
        pauseLock.lock();                                                      
        try {                                                                  
          while (isPaused) unpaused.await();                                   
        } catch (InterruptedException ie) {                                    
          t.interrupt();                                                       
        } finally {                                                            
          pauseLock.unlock();                                                  
        }                                                                      
      }                                                                        
                                                                               
      public void pause() {                                                    
        pauseLock.lock();                                                      
        try {                                                                  
          isPaused = true;                                                     
        } finally {                                                            
          pauseLock.unlock();                                                  
        }                                                                      
      }                                                                        
                                                                               
      public void resume() {                                                   
        pauseLock.lock();                                                      
        try {                                                                  
          isPaused = false;                                                    
          unpaused.signalAll();                                                
        } finally {                                                            
          pauseLock.unlock();                                                  
        }                                                                      
      }                                                                        
    }}                                                             
    

    2、Spring线程池

    官方已经实现的全部7个TaskExecuter。Spring宣称对于任何场景,这些TaskExecuter完全够用了:

    名字 特点
    SimpleAsyncTaskExecutor 每次请求新开线程,没有最大线程数设置.不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。限流是通过 ConcurrencyThrottleSupport类的monitor的wait(),notify() 实现的
    SyncTaskExecutor 不是异步的线程.同步可以用SyncTaskExecutor,但这个可以说不算一个线程池,因为还在原线程执行。这个类没有实现异步调用,只是一个同步操作。
    ConcurrentTaskExecutor Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
    SimpleThreadPoolTaskExecutor 监听Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
    ThreadPoolTaskExecutor 最常用。要求jdk版本大于等于5。可以在程序而不是xml里修改线程池的配置.其实质是对java.util.concurrent.ThreadPoolExecutor的包装。
    TimerTaskExecutor
    WorkManagerTaskExecutor
    • 使用ThreadPoolExecutorFactoryBean
    package org.springframework.scheduling.concurrent;
    
    public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
        private int corePoolSize = 1;
        private int maxPoolSize = 2147483647;
        private int keepAliveSeconds = 60;
        private boolean allowCoreThreadTimeOut = false;
        private int queueCapacity = 2147483647;
        private boolean exposeUnconfigurableExecutor = false;
        @Nullable
        private ExecutorService exposedExecutor;
    
        public ThreadPoolExecutorFactoryBean() {
        }
        ...
        protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
            return new ThreadPoolExecutor(corePoolSize, maxPoolSize, (long)keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
        }
    
    • 使用ScheduledExecutorFactoryBean
    package org.springframework.scheduling.concurrent;
    
    public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ScheduledExecutorService> {
        private int poolSize = 1;
        @Nullable
        private ScheduledExecutorTask[] scheduledExecutorTasks;
        private boolean removeOnCancelPolicy = false;
        private boolean continueScheduledExecutionAfterException = false;
        private boolean exposeUnconfigurableExecutor = false;
        @Nullable
        private ScheduledExecutorService exposedExecutor;
    
        public ScheduledExecutorFactoryBean() {
        }
    
    • 使用ThreadPoolTaskExecutor
      这个类可以设置回调方法

    • 使用ThreadPoolTaskScheduler
      这个类可以设置回调方法,包装错误处理类,错误不会影响执行下一个任务 @Scheduled就是使用这个类包装的,注意核心默认一个线程,会阻塞任务

    	@Override
    	public void run() {
    		try {
    			this.delegate.run();
    		}
    		catch (UndeclaredThrowableException ex) {
    			this.errorHandler.handleError(ex.getUndeclaredThrowable());
    		}
    		catch (Throwable ex) {
    			this.errorHandler.handleError(ex);
    		}
    	}
    

    参考

    [java8 api] https://www.matools.com/api/java8

    [spring线程池(同步、异步)] https://www.cnblogs.com/duanxz/p/9435343.html

    扫描二维码,关注公众号“猿必过”

    file

    回复 “面试题” 自行领取吧。

    微信群交流讨论,请添加微信号:zyhui98,备注:面试题加群

    本文由猿必过 YBG 发布

    禁止未经授权转载,违者依法追究相关法律责任

    如需授权可联系:zhuyunhui@yuanbiguo.com

  • 相关阅读:
    selenium python 中浏览器操作
    wireshark基础学习—第三部分wireshark的过滤器语法
    wireshark基础学习—第二部分wireshark的基础操作
    wireshark基础学习—第一部分wireshark的基础知识
    Python 之 tuple
    Python 之 list
    python socketpool:通用连接池
    APScheduler 3.0.1浅析
    检查SDE版本健康情况的常用SQL语句
    免重启下刷新新添加的磁盘信息
  • 原文地址:https://www.cnblogs.com/javago/p/14478294.html
Copyright © 2011-2022 走看看