zoukankan      html  css  js  c++  java
  • java线程池

    线程池

    线程池是什么?

    线程池的概念是初始化线程池时在池中创建空闲的线程,一旦有工作任务,可直接使用线程池中的线程进行执行工作任务,任务执行完成后又返回线程池中成为空闲线程。使用线程池可以减少线程的创建和销毁,提高性能。

        举个例子:我是一个包工头,代表线程池,手底下有若干工人代表线程池中的线程。如果我没接到项目,那么工人就相当于线程池中的空闲线程,一但我接到了项目,我可以立刻让我手下的工人去工作,每个工人同一时间执行只执行一个工作任务,执行完了就去执行另一个工作任务,直到没有工作任务了,这时工人就可以休息了。

    队列是什么?

    队列作为一个缓冲的工具,当没有足够的线程去处理任务时,可以将任务放进队列中,以队列先进先出的特性来执行工作任务

    举个例子,我又是一个包工头,一开始我只接了一个小项目,所以只有三个工作任务,但我手底下有四个工人,那么其中三人各领一个工作任务去执行就好了,剩下一个人就先休息。但突然我又接到了几个大项目,那么有现在有很多工作任务了,但手底下的工人不够啊。

    那么我有两个选择:

    (1)雇佣更多的工人

    (2)把工作任务记录下来,按先来后到的顺序执行

    但雇佣更多等工人需要成本啊,对应到计算机就是资源的不足,所以我只能把工作任务先记录下来,这样就成了一个队列了。

    为什么要使用线程池?

    假设我又是一个包工头,我现在手底下没有工人了,但我接到了一个项目,有了工作任务要执行,那我肯定要去找工人了,但招人成本是很高的,工作完成后还要给遣散费,这样算起来好像不值,所以我事先雇佣了固定的几个工人作为我的长期员工,有工作任务就干活,没有就休息,如果工作任务实在太多,那我也可以再临时雇佣几个工人。一来二去工作效率高了,付出的成本也低了。Java自带的线程池的原理也是如此。

    Java自带的线程池

    Executor接口是Executor的父接口,基于生产者--消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者,如果要在程序中实现一个生产者--消费者的设计,那么最简单的方式通常是使用Executor。

    Executor框架

    Executor框架是java中的线程实现。Executor是最顶层的接口定义,它的子类和实现主要包括ExecutorService、ScheduledExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool等。其结构如下图所示:

    Executor:Executor是一个接口,其只定义了一个execute()方法:void execute(Runnable command);,只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。
    ExecutorService:ExecutorService在Executor的基础上加入了线程池的生命周期管理,我们可以通过ExecutorService#shutdown或者ExecutorService#shutdownNow方法来关闭我们的线程池。ExecutorService支持提交Callable形式的任务,提交完Callable任务后我们拿到一个Future,它代表一个异步任务执行的结果。

    ThreadPoolExecutor:是线程池中最核心的类,这个类的各个构造参数

    1. corePoolSize:线程池的核心线程数目,当一个请求进来时如果当前线程池中线程数量小于这个值,则直接通过ThreadFactory新建一个线程来处理这个请求,如果已经有线程数量大于等于这个值则将请求放入阻塞队列中。
    2. maximumPoolSize:线程池的最大线程数目,当线程池数量已经等于corePoolSize并且阻塞队列已经满了,则看线程数量是否小于maximumPoolSize:如果小于则创建一个线程来处理请求,否则使用"饱和策略"来拒绝这个请求。对于大于corePoolSize部分的线程称这部分线程为"idle threads",这部分线程会有一个最大空闲时间,如果超过这个空闲时间还没有人任务进来则将这些空闲线程回收。
    3. keepAliveTime和unit:这两个参数主要用来控制idle threads的最大空闲时间,超过这个空闲时间空闲线程将被回收。这里有一点需要注意,ThreadPoolExecutor中有一个属性:private volatile Boolean allowCoreThreadTimeOut;这个用来指定是否允许核心线程空闲超时回收,默认为false,即不允许核心线程超时回收,核心线程将一直等待新任务。如果设置这个参数为true,核心线程空闲超时后也可以被回收。
    4. workQueue:阻塞队列,超过corePoolSize部分的请求放入这个阻塞队列中等待执行。阻塞对队列分为有界阻塞队列和无界阻塞队列。在创建阻塞队列时如果我们指定了这个队列的"capacity"则这个队列就是有界的,否则无界。
    5. threadFactory:是一个线程池工厂,主要用来为线程池创建线程,我们可以定制一个ThreadFactory来达到统一命名我们线程池中的线程的目的。
    6. handler:饱和策略,用来拒绝多余的请求。饱和策略有:CallerRunsPolicy:请求脱离线程池运行(调用者caller线程来运行这个任务);AbortPolicy:抛出RejectedExecutionException异常;DiscardPolicy:丢弃这个任务,即什么也不做;DiscardOldestPolicy:将阻塞队列中等待时间最久的任务删除(即队列头部的任务),将新的任务加入队尾。

    线程池的工作流程图

    1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

    2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

    3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

    4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

    Executor框架实现线程池的原理

    线程池内部状态

    其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
    1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
    2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
    3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
    4、TIDYING : 2 << COUNT_BITS,即高3位为010;
    5、TERMINATED: 3 << COUNT_BITS,即高3位为011;
    任务提交

    线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。

    Executor.excute()

    通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。

    ExecutorService.submit()

    通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

    任务执行

    当向线程池中提交一个任务,线程池会如何处理该任务?

    execute实现

    具体的执行流程如下:

    1.workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;否则执行步骤(2);
    2.如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),否则执行步骤(4);
    3.再次检查线程池的状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;
    4.执行addWorker方法创建新的线程执行任务,如果addWoker执行失败,则执行reject方法处理任务;

    addWorker实现

    从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,代码实现如下:

    这只是addWorker方法实现的前半部分:

    1、判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
    2、通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程,具体实现如下:

    线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程,其中Worker类设计如下:
    1、继承了AQS类,可以方便的实现工作线程的中止操作;
    2、实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
    3、当前提交的任务firstTask作为参数传入Worker的构造方法;

    从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。

    runWorker实现

    runWorker方法是线程池的核心:
    1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
    2、获取第一个任务firstTask,执行任务的run方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
    3、在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
    4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
    getTask实现

    整个getTask操作在自旋下完成:
    1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
    2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

    所以,线程池中实现的线程可以一直执行由用户提交的任务。

    自己实现线程池

    根据Executor框架实现线程池的原理,我们可以自己动手实现

    实现的思路如下:

     

     

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    
    public class ThreadPoolExecutor {
        
        /*
         * BlockingQueue是阻塞队列,在两种情况下出现阻塞:
         *     1、当队列满了,入队列操作时;
         *     2、当队列空了,出队列操作时。
         * 阻塞队列是线程安全的,主要使用在生产/消费者的场景
         */
        private BlockingQueue<Task> blockingQueue;
        
        //线程池的工作线程数(可以认为是线程池的容量)
        private int poolSize = 0;
        
        //线程池的核心容量(也就是当前线程池中真正存在的线程个数)
        private int coreSize = 0;
        
        /*
         * 此地方使用volatile关键字,volatile的工作原理是:对于JVM维度来说,每个线程持有变量的工作副本,那对于计算机维度来说,
         * 就是这些变量的中间值会存放在高速缓存中。通过volatile关键字,告知每个线程改变此变量之后,立马更新到内存中去,并且使得
         * 缓存中的数据失效,这样来保证其中某个线程改变公有变量后,其他线程能及时读取到最新的变量值,从而保证可见性。
         * 原因如下:
         *     1、在ThreadPoolExecutorTest中操作shutDown,这是main线程操作此变量(由于变量是volatile声明,所以会立马写入内存中);
         *     2、Worker中线程通过while(!shutDown)来判断当前线程是否应该关闭,因此需通过volatile保证可见性,使线程可以及时得到关闭。
         */
        private volatile boolean shutDown = false;
        
        public ThreadPoolExecutor(int size) {
            this.poolSize = size;
            //LinkedBlockingQueue的大小可以指定,不指定即为无边界的。
            blockingQueue = new LinkedBlockingQueue<>(poolSize);
        }
        
        public void execute(Task task) throws InterruptedException {
            if(shutDown == true) {
                return;
            }
            
            if(coreSize < poolSize) {
                /*
                 * BlockingQueue中的插入主要有offer(obj)以及put(obj)两个方法,其中put(obj)是阻塞方法,如果插入不能马上进行,
                 * 则操作阻塞;offer(obj)则是插入不能马上进行,返回true或false。
                 * 本例中的Task不允许丢失,所以采用put(obj);
                 */
                blockingQueue.put(task);
                produceWorker(task);
            }else {
                blockingQueue.put(task);
            }
        }
    
        private void produceWorker(Task task) throws InterruptedException {
            if(task == null) {
                throw new NullPointerException("非法参数:传入的task对象为空!");
            }
    
            Thread thread = new Thread(new Worker());        
            thread.start();
            coreSize++;
        }
        
        /*
         * 真正中断线程的方法,是使用共享变量发出信号,告诉线程停止运行。
         * 
         */
        public void shutDown() {
            shutDown = true;
        }
        
        /*
         * 此内部类是实际上的工作线程
         * 
         */
        class Worker implements Runnable {
    
            @Override
            public void run() {        
                while(!shutDown) {    
                    try {
                        //
                        blockingQueue.take().doJob();
                    } catch (InterruptedException e) {                    
                        e.printStackTrace();
                    }
                }            
                System.out.println("线程:" + Thread.currentThread().getName() + "退出运行!");
            }    
        }
    }
    public class Task {
    
        //通过taskId对任务进行标识
        private int taskId;
        
        public Task(int taskId) {
            this.taskId = taskId;
        }
    
        public void doJob() {
            System.out.println("线程" + Thread.currentThread().getName() + "正在处理任务!");
        }
    
        public int getId() {        
            return taskId;
        }
    }
    package com.ty.thread;
    
    /**
     */
    public class ThreadPoolExecutorTest {
    
        public static void main(String[] args) throws InterruptedException {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3);
            for(int i = 0; i < 10; i++) {
                Task task = new Task(i);
                threadPoolExecutor.execute(task);                
            }        
            
            threadPoolExecutor.shutDown();
        }
    }

    运行结果

    线程Thread-0正在处理任务!

    线程Thread-1正在处理任务!

    线程Thread-0正在处理任务!

    线程Thread-1正在处理任务!

    线程Thread-2正在处理任务!

    线程Thread-0正在处理任务!

    线程Thread-1正在处理任务!

    线程:Thread-1退出运行!

    线程:Thread-0退出运行!

    线程Thread-2正在处理任务!

    线程:Thread-2退出运行!

  • 相关阅读:
    BeanUtils
    eclipse导出说明文档
    MYSQL5.7的安装
    如何生成Android的keystore文件
    keystore
    安卓开发eclipse如何导出项目
    常用十六进制颜色对照表代码查询
    CheckBox
    RadioGroup和RadioButton
    TStringList 与 泛型字典TDictionary 的 哈希功能效率PK
  • 原文地址:https://www.cnblogs.com/kexinxin/p/11569981.html
Copyright © 2011-2022 走看看