zoukankan      html  css  js  c++  java
  • Java并发:搞定线程池(上)

    原文地址:https://www.nowcoder.com/discuss/152050?type=0&order=0&pos=6&page=0

    本文是在原文的基础+理解,想要系统学习,请看原文地址。

    线程池介绍

      1.1 线程池的概念

        线程池(thread pool): 一种线程使用模式。线程的创建销毁是十分消耗资源的(线程创建消耗内存、线程上下文切换从消耗CPU资源)。使用线程池可以更加充分的协调应用CPU、内存、网络、I/O等系统资源。在程序启动首先创建线程,在程序启动后可以将任务直接扔到线程池中,由线程池来执行这个任务。

      1.2 线程池的解决的问题

        ①线程创建销毁会开辟祸首虚拟机栈、本地方法栈和程序技术器等线程私有内存。

        ②如不加控制,线程数量在达到一定值时,由于线程上下文切换需要互斥、通信,会造成CPU资源浪费,极端情况下,系统分配的时间片都用来完成上下文切换,而没有时间去执行真正的任务。

        ③频繁的创建销毁线程增加并发编程的风险。

        ④线程池可以解决线程过多时的等待或友好的决绝服务。

      1.3 线程池作用

        ①利用线程池管理复用线程,控制最大并发数。(避免由于先吃过多,造成完成上下文切换而真正任务不执行)

        ②实现任务线程队列缓存策略和拒绝机制。(优雅的处理线程过多情况)

        ③实现某些与实践相关的功能。(可以利用线程池完成定时任务)

        ④隔离线程环境。(可以将不同类型的线程池放到一台服务器,隔离不同服务,避免线程相互影响)

      1.4 线程池的有点

        ①降低资源消耗,避免频繁的内存申请和销毁

        ②提高响应速度,不需要完成线程的创建,直接执行任务(前提是线程池中有空闲的线程)

        ③提高线程的可管理性,使用线程池可以同理管理、分配、调优和监控。

    线程池创建

        首先从ThreadPoolExecutor构造方法来讲自定义ThreadFactory和RejectedExecutionHandler,编写一个最简单的线程池。通过ThreadPoolExecutor的execute和addWorker两个核心方法。

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
    ......
    }

      参数说明:

        ①corePoolSize:表示核心线程数:

            corePoolSize=0,则任务执行完之后没有其他任务进入会销毁鲜线程池中的线程

            corePoolSize>0,本地任务执行完毕后,线程池会保留corePoolSize个线程

        ②maximumPoolSize:表示线程池能够容纳同时执行的最大线程数

            maximumPoolSize>=1,如果线程池中的线程执行,则数量大于等于1.

            如果需要执行的线程数大于maximumPoolSize个线程,需要第五个参数handler来处理,缓存还是 丢弃。

        ③keepAliveTime:表示线程池中的线程空闲时间

            当空闲时间达到keepAliveTime时,同时线程池中线程数大于信合线程数,则这个线程会被销毁。

            利用这个参数,可以避免资源的浪费。

            当allowCoreThreadTimeOut = true,核心线程超时也会被回收。

            由源码可知,allowCoreThreadTimeOut是由volatile修饰,说明这个值变化对说有线程池中的线程可见,由于没有赋初值,默认值是false,表示核心线程超时不会被回收。

            可以由方法设定,如果设置为true时,要保证线程空闲时间<=0,即表示空闲立马回收。

        private volatile boolean allowCoreThreadTimeOut;
                              
        public void allowCoreThreadTimeOut(boolean value) {
            if (value && keepAliveTime <= 0)
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    
            allowCoreThreadTimeOut = value;
        }

        ④unit:表示时间单位

            keepAliveTime的单位通常是TimeUnit.SECONDS. 

        ⑤workQueue:表示缓存队列

            当请求的线程数大于最大的线程并发数,线程会进入这个blockingQueue,阻塞保证出队入队的原子性

        ⑥threadFactory:线程工厂

            用来生产一组相同任务的线程。

            线程池的命名是通过给这个factory增加组名前缀来实现的。

            虚拟机栈在分析时知道线程任务是那个线程工厂生产的。

        ⑦handler:表示拒绝策略的对象

            当待执行的线程大于阻塞队列的最大值时,可以通过该策略处理请求,一种简单的限流保护,策略模式。

          优雅的拒绝策略包括:        

            保存数据库进行消费填谷;在空闲的事再提取出来执行

            转向某个提示页面

            打印日志

    2.1.1 corePoolSize 核心线程数量

        线程池中应该报纸的线程数量,即使线程处于空闲期间,线程也会存在于线程池,除非设置allowCoreThreadTimeOut这个参数是false。当线程池中的线程数量少于核心线程数量,线程池会创建一个新的线程来执行这个任务,即使线程池存在空闲线程也会创建新线程。等线程池中的线程数量等于信心线程数量,这时不会再产生线程,任务会被放在队列中。

        如果嗲用线程池的prestartAllCoreThreads(),线程池会提前创建并启动所有核心线程。

        public int prestartAllCoreThreads() {
            int n = 0;
            while (addIfUnderCorePoolSize(null))
                ++n;
            return n;
        }

    2.1.2 maximumPoolsize 线程池最大线程数,同时执行的线程

        线程池允许创建的最大线程数:最大线程数>=1

            在固定队列长度时,最大线程数作用:用来在任务数>核心线程数+队列长度&&任务数<=最大线程数+队列长度;

            如果不是固定队列这个值没有意义,任务会进入阻塞队列。

    2.1.3 workQueue阻塞队列,保存任务核心线程处理不力的任务

        存储待执行任务的阻塞队列,这些任务必须是Runnable对象(如果是Callable对象会在submit内部转weiRunnable对象)。

        Runnable TaskQueue(任务队列):用于保存等待执行的任务的阻塞队列,可选择下阻塞队列。

            LinkedBlockingQueue:一个基于链表结构的阻塞队列,队列按照FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用这个队列。

            SynchronusQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另外一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue。静态工厂方法Executors.newCachedThredPool使用这个队列。

     2.1.4 threadFactory线程工厂

        ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每一个创建出来的线程设置更有意义的名字。例如guava提供的ThreadFactoryBuilder快速给线程池里的线程设置有意义的名字,代码如下、

    new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

     2.1.5 RejectedExecutionHandler 拒绝策略

        采用了策略模式,当队列固定时,任务数>最大线程+队列长度时,这是再进来的任务需要一种策略来处理新提交的任务。默认策略AbortPolicy,表示处理新任务时抛出异常。

        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();

        在JDK5中线程池提供了4中策略

        AbortPolicy:丢弃任务,抛出RejectedExecuptionException,默认的策略

        CallerRunsPolicy:只用调用者所在线程来运行任务,有反馈机制,使任务提交的速度变慢,多任务下上下文切换耗时。

        DiscardOldestPolicy:若没有发生shutdown,尝试丢弃队列里最近的一个任务,并执行当前任务,丢弃任务缓存队列中最老的任务,并尝试重新提交新的任务。

        DiscardPolicy:不处理,丢弃,拒绝执行,不抛异常。

        也可以自定义类实现RejectedExecutionHandler接口,定制符合业务场景的拒绝策略,比如存库,日志记录等。

     2.1.6 keepAliveTime 线程活动保持的时间

        非核心线程线程任务执行结束后,保持多久的存活时间后被回收。

        如果回收核心线程,需要设置allowCoreThreadTimeOut=true,同时keepAliveTime设置为0。

        如果任务喝多,并且任务执行的时间较短,可以调大时间,提高线程的利用率。

     2.1.7 TImeUnit 线程活动保持单位

        指示第三个参数keepAliveTime时间的单位,常用TimeUnit.SECONDS(秒),这个类是JUC下的类,可以使用这个类来完成线程Thread.sleep(n)功能,对应TimeUnit.seconds.sleep(n).

    在实际编程中使用Exceutors这个线程池里的静态工厂来创建线程池。

    Executors工具类

        ExecutorService的抽象类AbstractExecutorService提供submit、invokeAll等方法的实现;核心方法executr没有在这里实现,因为所有的任务都执行改方法,类似模板方式的形式,不同实现会带来不同的执行策略。

        通过Executors的静态工厂可以创建三个线程洗的包装对象

        ①ForkJoinPool

        ②ThreadPoolExecutor

        ③ScheduledThreadPoolExecutor

     3.1.1 Executors.newWorkStealingPool

        JDK8引入,创建持有足够线程的线程池支持给定的并行度;

         通过多个队列减少竞争;

         构造方法中的CPU数量默认设置为默认的并行度。

         返回ForkJoinPool对象。

        public static ExecutorService newWorkStealingPool(int parallelism) {
            return new ForkJoinPool
                (parallelism,
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }

     3.1.2 Executors.newCachedThreadPool

        maxinumPoolSize最大可以取Integer的最大值,是高度可伸缩的线程池。

        核心线程数0,最大线程数Integer最大值,默认空闲时间60s,如果任务数增加,再次创建新的线程处理任务。线程数达到上限出现OOM。

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

     3.1.3 Executors.newScheduledThreadPool

        指定核心线程数,最大线程数是Integer最大值,没有空闲时间,阻塞队列时延迟的。

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }

    3.1.4 Executors.newWorkStealingPool

        输入的参数即使固定线程数,核心线程和最大线程数相等,不存在空闲线程,等待时间是0.

        因为使用LinkedBlockingQueue,不会因为线程数量发生OOM。

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

     3.1.5 Executors.newSingleThreadExecutor

        核心线程数=最大线程数=1,没有空闲时间,采用无界的LInkedBlockingQueue,不会因线程数出现OOM,但瞬间请求郭大师,会有OOM风险,相当于单线程串行执行所有任务,保证任务的提交按照提交顺序依次执行。

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

     3.2.1 线程工厂

        Executors中的默认的线程工厂和拒绝策略过于简单,对用户不够友好。可以自定义,下面是默认的线程工厂,可以定制化自己的线程工厂。

       static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }

     3.2.2 拒绝策略

        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
  • 相关阅读:
    继承
    JAVA接口的继承与集合
    JAVA接口
    c++程序—敲桌子
    c++程序—水仙花数
    c++程序—while猜数字游戏
    c++程序—switch分支
    c++程序—三目运算符
    c++程序—if语句实践
    c++程序—选择结构
  • 原文地址:https://www.cnblogs.com/dc-earl/p/11122370.html
Copyright © 2011-2022 走看看