zoukankan      html  css  js  c++  java
  • Java线程池之ThreadPoolExecutor

    前言

      线程池可以提高程序的并发性能(当然是合适的情况下),因为对于没有线程的情况下,我们每一次提交任务都新建一个线程,这种方法存在不少缺陷:

    1.  线程的创建和销毁的开销非常高,线程的创建需要时间,会延迟任务的执行,会消耗大量的系统资源。

    2.  活跃的线程会消耗系统资源,而大量的空闲线程会占用许多内存,给垃圾回收器带来很大的压力,而大量线程在竞争CPU资源的时间还会产生气体的性能开销。

    3.  系统在可创建的线程上存在一个限制,如果超过了这个限制,很可能抛出OOM。

      我们不难发现,在一定范围下,增加线程能够提高系统的吞吐量,而当线程数超过合理值后,增加的线程反而会降低程序的执行速度。

    Executor的引入

      在JDK1.5中,我们引入了一个线程池框架,Executor框架,它能够分解任务的创建和执行过程。它包括Executor、ExecutorService、Callable等接口和Executors、ThreadPoolExecutor实现类等。

    注意点

      当然了看待事物需要辩证,是否使用了Executor框架就能很好地将复杂的任务执行解耦开来的。这边我们其实需要限制一下它的使用范围。

    •          对于依赖型的任务而言,不是很适合使用线程池去操作,容易引发死锁,因为这种情况下我们需要小心维持这些任务的执行顺序,以保证不会触发死锁。
    •          对于通过线程封闭实现线程安全的任务而言,使用单线程的Executor能够保证更安全的并发。
    •          使用ThreadLocal的任务,因为线程池会复用线程,这将导致任务的ThreadLocal值失去意义(除非线程本地值受限于任务的生命周期)。
    •          对响应时间敏感的任务,假设我们将一个执行时间很长的任务,或者多个执行时间很长的任务放到一个单线程的Executor中或者一个包含少量线程的线程池中,都会降低程序的响应速度。

    合适的线程数

      线程池在对处理同一类型的任务且相互独立的时候,能达到性能上的最佳,否则任务时长不一致很容易引起拥塞或是饥饿。

    那线程池的线程数以多少为合适呢,对于计算密集型的任务而言,我们最好设置的线程数 = CPU数+1;(+1是为了保证当某个线程因为缺页故障或其他原因而暂停时,这个+1的线程能够确保CPU的时钟周期不会被浪费);而对于包含IO操作或者其他阻塞操作的任务时,由于线程不会一直执行,所以线程池的规模应该更大点。

    线程池的使用 

     在实际使用过程中,我们一般借助于ThreadPoolExecutor来完成线程池的创建。ThreadPoolExecutor具有极好的扩展性,除了系统提供的四种常用的线程池,如CachedThreadPoolExecutor,FixedThreadPoolExecutor,SingleThreadPoolExecutor,ScheduledThreadExecutor。我们可以自定义线程池的构造函数,如线程池的基本线程数、最大线程数、线程池的超时时间(时间+时间单位),线程池的任务队列,线程池的线程工厂,线程池的饱和策略。

    当然,我们在使用系统提供的四种线程池的时候,同样可以在后来修改线程池的配置。

    线程的任务队列

    LinkedBlockQueue:无界   CachedThreadPoolExecutor  FixedThreadPoolExecutor的默认任务队列

    ArrayBlockQueue:有界

    PriorityQueue:优先级队列,有界   ScheduledThreadPoolExecutor的默认任务队列

    SynchronousQueue:同步移交,队列容量为0,仅当有线程准备好时才会将任务放到队列中。

    饱和策略

    饱和策略的设置

      对于有界队列而言,当有界队列被填满后,这时候我们需要用到线程的饱和策略,前面提到我们可以在后面配置线程池的设置,而饱和策略的修改就是通过ThreadPoolExecutor的setRejectedExecutionHandler方法来进行修改的(当某个任务呗提交到一个已经被关闭的Executor中,也会用到饱和策略)

             饱和策略主要有以下几种:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

    1.  AbortPolicy:中止策略,该策略将抛出未受检的RejectedExecution,调用者可以捕获这个异常根据实际需要编写直接的处理代码。
    2. CallerRunsPolicy:调用者运行,该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者中,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是调用了execute的线程中执行任务,当线程池的任务队列被填满后,下一个任务会在调用execute时在主线程中执行,由于执行任务需要一定时间,这段时间内主线程显然不能提交任务,从而保证线程池在这段时间内处理现有任务。
    3. DiscardPolicy:抛弃策略,舍弃任务。
    4. DiscardOldestPolicy:抛弃下一个将被执行的任务,然后尝试重新提交新的任务,(不适用于和优先队列合用,因为这样将抛弃的将是优先级最高的等待任务)

             当然,我们可以使用Semaphore(信号量)来控制任务的提交速率。

    线程工厂 

      我们可以通过自定义线程工厂,来实现我们自己的线程。具体示例如下所示:

      自定义的线程工厂,记录了线程池的名字。

    import java.util.concurrent.ThreadFactory;
    
    /**
     * Created by DB on 2017/9/1.
     */
    public class MyThreadFactory implements ThreadFactory {
        private final String poolName;
    
        public MyThreadFactory(String poolName) {
            this.poolName = poolName;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            return new MyAPPThread(r,poolName);
        }
    }
    

    自定义的线程类:实现了指定线程的名字,设置自定义UncaughtExecptionHandler。

    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     * Created by DB on 2017/9/1.
     */
    public class MyAPPThread extends Thread {
        public static  final  String DEFALUT_NAME = "MyAppThread";
        private static volatile boolean debugLifecycle = false;
        private static final AtomicInteger created = new AtomicInteger();
        private static final AtomicInteger alive = new AtomicInteger();
        private static final Logger log = Logger.getAnonymousLogger();
    
        public MyAPPThread(Runnable r){
            this(r,DEFALUT_NAME);
        }
        public MyAPPThread(Runnable r,String name){
            super(r,name +"-"+created.incrementAndGet());
            setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    log.log(Level.SEVERE,"Uncaught in thread"+t.getName(),e);
                }
            });
        }
    
        public void run(){
            boolean debug = debugLifecycle;
            if(debug){
                log.log(Level.FINE,"Created" +getName());
            }
            try {
                alive.incrementAndGet();
                super.run();
            }finally {
                alive.decrementAndGet();
                if(debug){
                    log.log(Level.FINE,"Exiting"+getName());
                }
            }
        }
        public static int getThreadsCreated(){
            return  created.get();
        }
        public static int getThreadsAlive(){
            return alive.get();
        }
        public static boolean getDebug(){
            return  debugLifecycle;
        }
        public static  void setDebug(Boolean b){
            debugLifecycle=b;
        }
    }

    扩展ThreadPoolExecutor

      ThreadPoolExecutor是可扩展的,它提供了几个在子类中可以改写的方法:beforeExecute、afterExecute和terminated,通过实现这些方法我们可以实现扩展。

             在执行任务的线程中将调用beforeExecute和afterExecute方法,在这些方法中我们可以添加日志、计时。监视或者统计信息收集的功能。无论任务是从run中正常返回还是抛出一个异常而返回,afterExecute都会被调用。而beforeExecute抛出RuntimeException时,任务将不被执行,afterExecute当然也不会被调用。

             在线程池完成关闭操作时,会调用terminated,也就在所有任务都已经完成并且所有工作者线程也已经关闭后。在这个方法中我们可以实现Executor在其生命周期中分配的各种资源,还有执行发送通知、记录日志或者收集finalize统计信息等操作。

      具体的框架如下:

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by DB on 2017/9/1.
     */
    public class MyThreadPoolExecutor extends ThreadPoolExecutor {
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
        }
    
        @Override
        protected void terminated() {
            super.terminated();
        }
    }
  • 相关阅读:
    leetcode 48. Rotate Image
    leetcode 203. Remove Linked List Elements 、83. Remove Duplicates from Sorted List 、82. Remove Duplicates from Sorted List II(剑指offer57 删除链表中重复的结点) 、26/80. Remove Duplicates from Sorted ArrayI、II
    leetcode 263. Ugly Number 、264. Ugly Number II 、313. Super Ugly Number 、204. Count Primes
    leetcode 58. Length of Last Word
    安卓操作的一些问题解决
    leetcode 378. Kth Smallest Element in a Sorted Matrix
    android studio Gradle Build速度加快方法
    禁用gridview,listview回弹或下拉悬停
    Android Studio找不到FragmentActivity类
    安卓获取ListView、GridView等滚动的距离(高度)
  • 原文地址:https://www.cnblogs.com/hustzhb/p/7464094.html
Copyright © 2011-2022 走看看