zoukankan      html  css  js  c++  java
  • Executor并发框架--线程池,ThreadToolExecutor初步

    Executor存在的目的是提供一种将"任务提交""任务如何运行"分离开来的机制。虽然只有一个方法,但是却为灵活且强大的异步任务执行框架提供了基础。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
    一、线程池
        对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接时非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是正在意义上的关闭,只是把连接放回到我们的池里,供其他人在使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池是差不多的,因此java jdk中也提供了线程池的功能。
        线程池的作用:线程池就是限制系统中使用线程的数量以及更好的使用线程
        根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。
    为什么要用线程池:
        1.减少线程创建和销毁的次数,使线程可以多次复用
        2.可以根据系统情况,调整线程的数量。防止创建过多的线程,消耗过多的内存(每个线程1M左右)
    Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
    比较重要的几个类:
    ExecutorService
    真正的线程池接口。
    ScheduledExecutorService
    能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
    ThreadPoolExecutor
    ExecutorService的默认实现。
    ScheduledThreadPoolExecutor
    继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
        要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
    java通过Executors提供四种线程池,分别为:
        1、Executors.newCachedThreadPool(),创建一个可缓存线程池,缓冲池容量大小为Integer.MAX_VALUE,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        2、Executors.newFixedThreadPool(int) 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
        3、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
        4、Executors.newSingleThreadExecutor() 创建一个单线程化(容量为1)的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    1-1.newSingleThreadExecutor
    Executors类里的两个静态实现方法
    <pre name="code" class="java">    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
        创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。测试:
        /**
         * 4、Executors.newSingleThreadExecutor() 创建一个单线程化(容量为1)的线程池,它只会用唯一的工作线程来执行任务,
         * 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
         */
        public static void main(String[] args) {
            ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                final int index = i;
                singleThreadExecutor.execute(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    ThreadName:pool-1-thread-1,0
    ThreadName:pool-1-thread-1,1
    ThreadName:pool-1-thread-1,2
    ThreadName:pool-1-thread-1,3
    ThreadName:pool-1-thread-1,4
    ...... 线程名都一样,说明是同一个线程
    1-2.newFixedThreadPool
         Executors类里的构造方法,可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话 maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表明什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。
    <pre name="code" class="java"><pre name="code" class="java">    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
        创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
        public static void main(String[] args) {
            /**
             * 2、Executors.newFixedThreadPool(int) 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
             *
             * 因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
             * 定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
             */
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                final int index = i;
                fixedThreadPool.execute(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    ThreadName:pool-1-thread-1,0
    ThreadName:pool-1-thread-2,1
    ThreadName:pool-1-thread-3,2
    ThreadName:pool-1-thread-3,3
    ThreadName:pool-1-thread-2,4
    ThreadName:pool-1-thread-1,5
    。。。。。。
    1-3.newCachedThreadPool
        该线程池比较适合没有固定大小并且比较快速就能完成的小任务,它将为每个任务创建一个线程。那这样子它与直接创建线程对象(new Thread())有什么区别呢?看到它的第三个参数60L和第四个参数TimeUnit.SECONDS了吗?好处就在于60秒内能够重用已创建的线程。
      //这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。
    //可能对于该BlockingQueue有些陌生,简单说:该 QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。
      public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
        创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。
        public static void main(String[] args) {
            /**
             * 1、Executors.newCachedThreadPool(),创建一个可缓存线程池,缓冲池容量大小为Integer.MAX_VALUE,
             * 如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
             *
             * 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
             */
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                final int index = i;
                try {
                    Thread.sleep( 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //任务提交
                cachedThreadPool.execute(new Runnable() {
                    public void run() {
                        //线程名一样
                        System.out.println("ThreadName:" + Thread.currentThread().getName()+","+index);
                    }
                });
            }
            /**
             * ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
             * shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
             * shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
             */
            cachedThreadPool.shutdownNow();
        }
    ThreadName:pool-1-thread-1,0
    ThreadName:pool-1-thread-1,1
    ThreadName:pool-1-thread-1,2
    ThreadName:pool-1-thread-1,3
    ThreadName:pool-1-thread-1,4
    ......     线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
    1-4.newScheduledThreadPool
    <pre name="code" class="java">    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
        创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
        public static void main(String[] args) {
            /**
             * 3、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
             */
            //延迟执行示例代码如下:
    /*        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
            scheduledThreadPool.schedule(new Runnable() {
                public void run() {
                    System.out.println("delay 3 seconds");
                }
            }, 3, TimeUnit.SECONDS);  //延迟3秒后执行run方法。
    */
            // 定期执行示例代码如下:
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    System.out.println("delay 1 seconds, and excute every 3 seconds");
                }
            }, 1, 3, TimeUnit.SECONDS); //延迟1秒后每3秒执行一次。
        }
        1> 除了CachedThreadPool使用的是直接提交策略的缓冲队列以外,其余两个用的采用的都是无界缓冲队列,也就说,FixedThreadPool和SingleThreadExecutor创建的线程数量就不会超过 corePoolSize。    
        2> 我们可以再来看看三个线程池采用的ThreadPoolExecutor构造方法都是同一个,使用的都是默认的ThreadFactory和handler: 也就说三个线程池创建的线程对象都是同组,优先权等级为正常的Thread.NORM_PRIORITY(5)的非守护线程,使用的被拒绝任务处理方式是直接抛出异常的AbortPolicy策略
    2、ExecutorService任务周期管理接口
    Executor的实现通常都会创建线程来执行任务,但是使用异步方式来执行任务时,由于之前提交任务的状态不是立即可见的,所以如果要关闭应用程序时,就需要将受影响的任务状态反馈给应用程序。
    
    为了解决执行服务的生命周期问题,Executor扩展了EecutorService接口,添加了一些用于生命周期管理的方法。如下:
    
    public interface ExecutorService extends Executor {  
        void shutdown();  
        List<Runnable> shutdownNow();  
        boolean isShutdown();  
        boolean isTerminated();  
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
        // 省略部分方法  
    } 
    二、ThreadToolExecutor详解
          java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
      在ThreadPoolExecutor类中提供了四个构造方法:
    public class ThreadPoolExecutor extends AbstractExecutorService {
        ....
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
    
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            if (corePoolSize < 0 || maximumPoolSize <= 0 ||
                    maximumPoolSize < corePoolSize || keepAliveTime < 0)
                throw new IllegalArgumentException();
    
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
    
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
        ....
     }
        从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作
        corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有 任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者 prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线 程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁;
        maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;该值实际的可设置最大值不是Integer.MAX_VALUE,而是常量CAPACITY
    1> corePoolSize与maximumPoolSize  由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:
               如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的;
          如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理
          如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程才创建新的线程去处理请求;
          如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求;
          如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务
    
    也就是说,处理任务的优先级为: 
        1. 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
        2. 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。
    
        keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize 时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。即当线程池中的线程数大于corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize 时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
    2> workQueue 线程池所使用的缓冲队列,该缓冲队列的长度决定了能够缓冲的最大数量,缓冲队列有三种通用策略:
      1) 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
       2) 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
       3) 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.
    
        unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS;               //
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    TimeUnit.SECONDS.sleep(1);相当于 Thread.sleep(1000);
        workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
    ArrayBlockingQueue; //有界队列
    LinkedBlockingQueue; //无界队列
    SynchronousQueue; //特殊的一个队列,只有存在等待取出的线程时才能加入队列,可以说容量为0,是无界队列
         ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
        threadFactory:线程工厂,主要用来创建线程;
    3>ThreadFactory   使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
        handler:表示当拒绝处理任务时的策略,有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    ThreadPoolExecutor是Executors类的底层实现。
    在JDK帮助文档中,有如此一段话:
        “强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)
        它们均为大多数使用场景预定义了设置。”
    
    通过上面的4个构造函数创建完线程池后,就可以通过submit或execute方法提交任务;
    工作完成后当然最后要关闭线程池,可以调用下面两个方法:
      - shutdown():对已经在执行的线程进行关闭,不再接收新的任务;
      - shutdownNow():尝试停止正在执行的线程,会清除任务队列中的任务;
    以上两个方法都不会等到所有任务完成后关闭,需要通过awaitTermination(long, TimeUnit)方法实现;
  • 相关阅读:
    背水一战 Windows 10 (90)
    背水一战 Windows 10 (89)
    背水一战 Windows 10 (88)
    背水一战 Windows 10 (87)
    背水一战 Windows 10 (86)
    背水一战 Windows 10 (85)
    背水一战 Windows 10 (84)
    背水一战 Windows 10 (83)
    背水一战 Windows 10 (82)
    背水一战 Windows 10 (81)
  • 原文地址:https://www.cnblogs.com/hoobey/p/7559912.html
Copyright © 2011-2022 走看看