zoukankan      html  css  js  c++  java
  • Executor并发框架--线程池,ThreadToolExecutor初步

    Executor存在的目的是提供一种将"任务提交""任务如何运行"分离开来的机制。虽然只有一个方法,但是却为灵活且强大的异步任务执行框架提供了基础。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
    一、线程池
        对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接时非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是正在意义上的关闭,只是把连接放回到我们的池里,供其他人在使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池是差不多的,因此java jdk中也提供了线程池的功能。
        线程池的作用:线程池就是限制系统中使用线程的数量以及更好的使用线程
        根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。
    为什么要用线程池:
        1.减少线程创建和销毁的次数,使线程可以多次复用
        2.可以根据系统情况,调整线程的数量。防止创建过多的线程,消耗过多的内存(每个线程1M左右)
    Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
    比较重要的几个类:
    ExecutorService
    真正的线程池接口。
    ScheduledExecutorService
    能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
    ThreadPoolExecutor
    ExecutorService的默认实现。
    ScheduledThreadPoolExecutor
    继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
        要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
    java通过Executors提供四种线程池,分别为:
        1、Executors.newCachedThreadPool(),创建一个可缓存线程池,缓冲池容量大小为Integer.MAX_VALUE,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        2、Executors.newFixedThreadPool(int) 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
        3、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
        4、Executors.newSingleThreadExecutor() 创建一个单线程化(容量为1)的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    1-1.newSingleThreadExecutor
    Executors类里的两个静态实现方法
    <pre name="code" class="java">    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
        创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。测试:
        /**
         * 4、Executors.newSingleThreadExecutor() 创建一个单线程化(容量为1)的线程池,它只会用唯一的工作线程来执行任务,
         * 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
         */
        public static void main(String[] args) {
            ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                final int index = i;
                singleThreadExecutor.execute(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    ThreadName:pool-1-thread-1,0
    ThreadName:pool-1-thread-1,1
    ThreadName:pool-1-thread-1,2
    ThreadName:pool-1-thread-1,3
    ThreadName:pool-1-thread-1,4
    ...... 线程名都一样,说明是同一个线程
    1-2.newFixedThreadPool
         Executors类里的构造方法,可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话 maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表明什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。
    <pre name="code" class="java"><pre name="code" class="java">    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
        创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
        public static void main(String[] args) {
            /**
             * 2、Executors.newFixedThreadPool(int) 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
             *
             * 因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
             * 定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
             */
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                final int index = i;
                fixedThreadPool.execute(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    ThreadName:pool-1-thread-1,0
    ThreadName:pool-1-thread-2,1
    ThreadName:pool-1-thread-3,2
    ThreadName:pool-1-thread-3,3
    ThreadName:pool-1-thread-2,4
    ThreadName:pool-1-thread-1,5
    。。。。。。
    1-3.newCachedThreadPool
        该线程池比较适合没有固定大小并且比较快速就能完成的小任务,它将为每个任务创建一个线程。那这样子它与直接创建线程对象(new Thread())有什么区别呢?看到它的第三个参数60L和第四个参数TimeUnit.SECONDS了吗?好处就在于60秒内能够重用已创建的线程。
      //这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。
    //可能对于该BlockingQueue有些陌生,简单说:该 QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。
      public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
        创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。
        public static void main(String[] args) {
            /**
             * 1、Executors.newCachedThreadPool(),创建一个可缓存线程池,缓冲池容量大小为Integer.MAX_VALUE,
             * 如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
             *
             * 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
             */
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                final int index = i;
                try {
                    Thread.sleep( 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //任务提交
                cachedThreadPool.execute(new Runnable() {
                    public void run() {
                        //线程名一样
                        System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
                    }
                });
            }
            /**
             * ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
             * shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
             * shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
             */
            cachedThreadPool.shutdownNow();
        }
    ThreadName:pool-1-thread-1,0
    ThreadName:pool-1-thread-1,1
    ThreadName:pool-1-thread-1,2
    ThreadName:pool-1-thread-1,3
    ThreadName:pool-1-thread-1,4
    ......     线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
    1-4.newScheduledThreadPool
    <pre name="code" class="java">    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
        创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
        public static void main(String[] args) {
            /**
             * 3、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
             */
            //延迟执行示例代码如下:
    /*        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
            scheduledThreadPool.schedule(new Runnable() {
                public void run() {
                    System.out.println("delay 3 seconds");
                }
            }, 3, TimeUnit.SECONDS);  //延迟3秒后执行run方法。
    */
            // 定期执行示例代码如下:
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    System.out.println("delay 1 seconds, and excute every 3 seconds");
                }
            }, 1, 3, TimeUnit.SECONDS); //延迟1秒后每3秒执行一次。
        }
        1> 除了CachedThreadPool使用的是直接提交策略的缓冲队列以外,其余两个用的采用的都是无界缓冲队列,也就说,FixedThreadPool和SingleThreadExecutor创建的线程数量就不会超过 corePoolSize。    
        2> 我们可以再来看看三个线程池采用的ThreadPoolExecutor构造方法都是同一个,使用的都是默认的ThreadFactory和handler: 也就说三个线程池创建的线程对象都是同组,优先权等级为正常的Thread.NORM_PRIORITY(5)的非守护线程,使用的被拒绝任务处理方式是直接抛出异常的AbortPolicy策略
    2、ExecutorService任务周期管理接口
    Executor的实现通常都会创建线程来执行任务,但是使用异步方式来执行任务时,由于之前提交任务的状态不是立即可见的,所以如果要关闭应用程序时,就需要将受影响的任务状态反馈给应用程序。
    
    为了解决执行服务的生命周期问题,Executor扩展了EecutorService接口,添加了一些用于生命周期管理的方法。如下:
    
    public interface ExecutorService extends Executor {  
        void shutdown();  
        List<Runnable> shutdownNow();  
        boolean isShutdown();  
        boolean isTerminated();  
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
        // 省略部分方法  
    } 
    二、ThreadToolExecutor详解
          java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
      在ThreadPoolExecutor类中提供了四个构造方法:
    public class ThreadPoolExecutor extends AbstractExecutorService {
        ....
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
    
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            if (corePoolSize < 0 || maximumPoolSize <= 0 ||
                    maximumPoolSize < corePoolSize || keepAliveTime < 0)
                throw new IllegalArgumentException();
    
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
    
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
        ....
     }
        从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作
        corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有 任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者 prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线 程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁;
        maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;该值实际的可设置最大值不是Integer.MAX_VALUE,而是常量CAPACITY
    1> corePoolSize与maximumPoolSize  由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:
               如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的;
          如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理
          如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程才创建新的线程去处理请求;
          如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求;
          如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务
    
    也就是说,处理任务的优先级为: 
        1. 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
        2. 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。
    
        keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize 时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。即当线程池中的线程数大于corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize 时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
    2> workQueue 线程池所使用的缓冲队列,该缓冲队列的长度决定了能够缓冲的最大数量,缓冲队列有三种通用策略:
      1) 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
       2) 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
       3) 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.
    
        unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS;               //
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    TimeUnit.SECONDS.sleep(1);相当于 Thread.sleep(1000);
        workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
    ArrayBlockingQueue; //有界队列
    LinkedBlockingQueue; //无界队列
    SynchronousQueue; //特殊的一个队列,只有存在等待取出的线程时才能加入队列,可以说容量为0,是无界队列
         ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
        threadFactory:线程工厂,主要用来创建线程;
    3>ThreadFactory   使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
        handler:表示当拒绝处理任务时的策略,有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    ThreadPoolExecutor是Executors类的底层实现。
    在JDK帮助文档中,有如此一段话:
        “强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)
        它们均为大多数使用场景预定义了设置。”
    
    通过上面的4个构造函数创建完线程池后,就可以通过submit或execute方法提交任务;
    工作完成后当然最后要关闭线程池,可以调用下面两个方法:
      - shutdown():对已经在执行的线程进行关闭,不再接收新的任务;
      - shutdownNow():尝试停止正在执行的线程,会清除任务队列中的任务;
    以上两个方法都不会等到所有任务完成后关闭,需要通过awaitTermination(long, TimeUnit)方法实现;
  • 相关阅读:
    redis持久化RDB和AOF
    线程同步的几种方法
    JRE和JDK的区别
    Spring-两种配置容器
    为什么String类是不可变的?
    Oracle 每五千条执行一次的sql语句
    Executor , ExecutorService 和 Executors
    常见框架单例、多例与线程安全性总结
    mysql 的S 锁和X锁的区别
    linux下使用shell脚本自动化部署项目
  • 原文地址:https://www.cnblogs.com/hoobey/p/7559912.html
Copyright © 2011-2022 走看看