zoukankan      html  css  js  c++  java
  • java多线程:线程池原理、阻塞队列


    一、线程池定义和使用


    jdk 1.5 之后就引入了线程池。

    1.1 定义


    从上面的空间切换看得出来,线程是稀缺资源,它的创建与销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态切换。为避免资源过度消耗需要设法重用线程执行多个任务。线程池就是一个线程缓存,负责对线程进行统一分配、调优与监控。(数据库连接池也是一样的道理)

    什么时候使用线程池?

    单个任务处理时间比较短;需要处理的任务数量很大。

    线程池优势?

    • 重用存在的线程,减少线程创建、消亡的开销,提高性能、提高响应速度。
    • 当任务到达时,任务可以不需要等到线程创建就能立即执行。
    • 提高线程的可管理性,可统一分配,调优和监控。

    1.2 线程池在 jdk 已有的实现


    • 在 juc 包下,有一个接口:Executor :
    • Executor 又有两个子接口:ExecutorService 和 ScheduledExecutorService,常用的接口是 ExecutorService。
    • 同时常用的线程池的工具类叫 Executors。

    例如:

    ExecutorService service = Executors.newCachedThreadPool();
    

    Executor 框架虽然提供了如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等创建线程池的方法,但都有其局限性,不够灵活。

    上面的几种方式点进去会发现,都是用 ThreadPoolExecutor 进行创建的:

    1. newSingleThreadExecutor 字面意思 简单线程执行器
    1. newFixedThreadPool 字面意思固定的线程池,传参就是线程固定数目,适用于执行长期任务的场景。
    1. newCachedThreadPool 字面意思 缓存线程池,核心线程0,最大线程非常大,动态创建的特点。
    1. newScheduledThreadPool 字面意思 时间安排线程池,指定核心线程数。
    1. newSingleThreadScheduledExecutor 字面意思 单线程安排执行器,也就是基于只有一个核心线程的执行器之外,又可以扩展。其中又用 DelegatedExecutorService 委托执行器服务进行了包装。

    可以看到,上面直接用 Executors 工具类默认的一些实现 new 出来的线程池都是用的 ThreadPoolExecutor 线程执行器这个类进行构造的,不过参数不同,导致了效果的侧重点不同。

    因此,自己创建线程池推荐的方法就是,直接使用 ThreadPoolExecutor 进行个性化的创建:

    构造方法种的参数有 7 个:

    • corePoolSize:线程池维护线程的最少数量 (core : 核心
    • maximumPoolSize:线程池维护线程的最大数量,显然必须>=1
    • keepAliveTime:线程池维护的多余的线程所允许的空闲时间,最长可以空闲多久,时间到了,如果超过 corePoolSize 的线程一直空闲,他们就会被销毁。
    • unit:线程池维护线程所允许的空闲时间的单位
    • workQueue:线程池所使用的缓冲队列,已经提交但是没有执行的任务会放进这里
    • threadFactory:生成线程池种工作线程的线程工厂,一般使用默认
    • handler:线程池对拒绝任务的处理策略,当队列满且工作线程已经达到maximumPoolSize。

    阿里的 java 开发手册,强制要求,通过 ThreadPoolExecutor 来自定义,不能使用内置的,避免资源耗尽。这个很好理解,1 的类型就只有一个核心线程和最大现场,2 没有扩展性,3、4、5的最大线程数太大,内存会爆炸。

    1.3 线程池使用方法


    这里我们用固定线程池来测试,传入核心线程数为 5,最大数量自然就也是 5,

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        try {
            //模拟10个顾客办理业务
            for (int i = 0; i < 10; i++){
                //execute 执行方法,传入参数为实现了 Runnable 接口的类
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"号线程办理业务");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
    

    其中,execute 方法就是将任务提交的方法,我们用 lambda 表达式给 execute 方法传入了参数,实际上相当于一个完整的实现了 Runnable 接口的类。

    执行结果:

    可以看到,我们循环了 10 次,执行任务,但是线程只用到了 1-5 ,其中有多次复用

    再比如,我们按照各种类型的线程池,自己定义一个线程池,核心线程数 2, 最大线程数 5,阻塞队列长度为 3:

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        
        try {
            //模拟10个顾客办理业务
            for (int i = 0; i < 10; i++){
                //execute 执行方法,传入参数为实现了 Runnable 接口的类
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"号线程办理业务");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
    

    同样 10 个线程,执行起来:

    可以看到,执行了 8 个任务后,就抛出了异常,说明执行了拒绝策略

    上面两个示例,我们的任务本身都是没有返回值的,如果创建的任务本身需要有返回值就需要实现 Callable 接口,然后搭配FutureTask 来传入任务,那么线程池就应该调用 submit 方法而不是 execute。


    二、线程池底层原理


    2.1 线程池执行逻辑


    处理的流程核心就 execute() 方法,他接收一个实现了 Runnable 接口的任务,决定对这个任务的处理策略。

    下图是一个比较形象的策略流程:

    可能的情况有四种,也就是图中的1234:

    1. 如果线程池中的线程数量少于corePoolSize,就创建新的核心线程来执行新添加的任务
    2. 如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到队列workQueue中
    3. 如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的非核心线程来处理被添加的任务
    4. 如果线程池中的线程数量等于了maximumPoolSize,就用RejectedExecutionHandler来执行拒绝策略。会抛出异常,一般的拒绝策略是RejectedExecutionException

    注意,执行的顺序,在 java 里有一个不合理的地方:
    在池里安排任务的时候,我们的核心线程,队列,非核心线程里面排的任务顺序应该是 1 2 3;

    但是真正实现上,如果三个都满了,开始执行的时候,依次执行的顺序却是 核心线程,非核心线程,队列。也就是执行顺序会变成 1 3 2

    2.2 拒绝策略


    有些时候,我们并不希望拒绝策略是直接抛出异常,那么 jdk 里面提供的默认拒绝策略有 4 种,他们体现在代码中就是 ThreadPoolExecutor 的四个静态内部类:

    2.2.1 CallerRunsPolicy:调用者运行策略。

    这种策略不会抛弃任务,也不抛出异常,而是将某些任务回退给调用者,从而降低新任务的流量。

    实现非常简单,那就是如果说 e 这个线程池已经 shutdown 了,那么就什么也不干,也就是这个任务直接丢了;否则,r.run() ,相当于调用这个方法的线程里直接执行了这个 Runnable 任务。

    此时我们可以把 1.3 里的代码修改一下,只修改策略为 CallerRunsPolicy:

    可以看到,有些任务会在 main 线程里处理。

    2.2.2 AbortPolicy:终止策略。

    抛异常。前面已经试过了,这个是默认的拒绝策略。

    2.2.3 DiscardPolicy:丢弃任务。

    可以看到,源码里就是是什么也不做。如果场景中允许任务丢失,这个是最好的策略。

    2.2.4 DiscardOldestPolicy:抛弃队列中等待最久的任务。

    抛弃队列中等待最久的任务,然后把当前的任务加入队列中,尝试再次提交当前任务。

    源码里也就是利用队列操作,进行一次出队操作,然后重新调用 execute 方法。

    2.3 线程池的五种状态


    一个正常的线程的生命周期区别开,这个是线程池里线程的状态。

    1. Running,能接受新任务以及处理已添加的任务;
    2. Shutdown,不接受新任务,可以处理已经添加的任务,也就是不能再调用execute或者submit了;
    3. Stop,不接受新任务,不处理已经添加的任务,并且中断正在处理的任务;
    4. Tidying,所有的任务已经终止,CTL记录的任务数量为0,CTL负责记录线程池的运行状态与活动线程数量;
    5. Terminated,线程池彻底终止,则线程池转变为terminated的状态。

    如图所示,从running状态转换为 shutdown,调用 shutdown()方法;如果调用shutdownNow()方法,就直接会变成stop。

    terminated()是钩子函数,默认是什么也不做的,我们可以重写,然后决定结束之前要做一些别的处理逻辑。这个钩子函数,就是模板模式的方法。

    三、阻塞队列


    线程池里的 BlockingQueue,阻塞队列,事实上在消费者生产者问题里的管程法实现,我们的策略也是类似阻塞队列的,用它来做一个缓存池的作用。

    阻塞队列:任意时刻,不管并发有多高,永远保证只有一个线程能够进行队列的入队或出队操作。也就意味着他是能够保证线程安全的。

    另外,阻塞队列分为有界和无界队列,理论上来说一个是队列的size有固定,另一个是无界的。对于有界队列来说,如果队列存满,只能出队了,入队操作就只能阻塞。

    在 juc 包里,阻塞队列的实现有很多:

    1. ArrayBlockingQueue:有界阻塞队列;
    2. LinkedBlockingQueue:链表结构(大小默认值为Integer.MAX_VALUE)的阻塞队列;
    3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列;
    4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列;
    5. SynchronousQueue:不存储元素的阻塞队列,相当于只有一个元素;
    6. LinkedTransferQueue:链表组成的无界阻塞队列;
    7. LinkedBlockingDeque:链表组成的双向阻塞队列。

    对于 BlockingQueue 来说,核心操作主要有几类:插入、删除、查找。

    其中的四种异常策略:

    • 抛异常:如果阻塞队列满,再往队列里 add 插入元素会抛 IllegalStateException:Queue full,如果阻塞队列空,再 remove 就会抛 NoSuchElementException。
    • 特殊值:offer 方法:成功 true,失败 false,poll 方法,成功就返回元素,没有就返回 null。
    • 阻塞:阻塞队列满的时候,生产者线程继续 put 元素,队列就会阻塞直到可以 put 数据或者响应中断然后退出,阻塞队列空的时候,消费者线程继续 take 元素,队列就会一直阻塞直到有元素可以 take。
    • 超时退出:阻塞队列满的时候,会阻塞生产者线程且超时退出,空的时候会阻塞消费者线程且超时退出。

    那么使用的时候,增删的方法按对应的同一组使用比较合理。(其实这个策略的设计对应的在单线程集合里也有,那就是Deque接口的实现类 LinkedList 使用的时候,不同的增删方法策略不同)

  • 相关阅读:
    8086汇编中的逻辑地址与物理地址转换
    wepy开发踩坑记录
    cordova开发的坑
    express转发请求
    Hybrid app(cordova) 环境配置记录
    laravel-mix 热重载404的问题
    练习
    git 使用记录
    Vue全家桶开发笔记
    微信小程序开发踩坑记录
  • 原文地址:https://www.cnblogs.com/lifegoeson/p/13780720.html
Copyright © 2011-2022 走看看