zoukankan      html  css  js  c++  java
  • 阻塞队列、线程池、异步

    参考
    https://www.cnblogs.com/aspirant/p/8657801.html
    https://www.cnblogs.com/linguanh/p/8000063.html
    《java多线程编程实战指南》

    BlockingQueue

    阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

    从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
    常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)

    • 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
    • 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。  
      多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒),下面两幅图演示了BlockingQueue的两个常见阻塞场景:

    上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

    如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
    这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

    BlockingQueue的核心方法

    放入数据

    (1)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
    (2)offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
    (3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

    获取数据

    (1)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
    (2)poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
    (3)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
    (4)drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    常见BlockingQueue

    ArrayBlockingQueue

    基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
    ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue,可能导致锁的高争用,进而导致较多的上下文切换;
    ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。ArrayBlockingQueue不会增加GC负担,这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

    LinkedBlockingQueue

    基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
    作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

    SynchronousQueue

    一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
    声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:

    • 如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
    • 但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

    对比

    ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列。

    • 是否有界
      ArrayBlockingQueue是有界队列, LinkedBlockingQueue既可以有界也可以无界
      调度
    • 调度
      LinkedBlockingQueue仅支持非公平调度
      ArrayBlockingQueue和SynchronousQueue支持公平和非公平调度
    • 适用场景
      LinkedBlockingQueue适合生产者和消费者线程并发程度较大的场景;
      ArrayBlockingQueue适合生产者和消费者线程并发程度较低的场景;
      SynchronousQueue适合生产者和消费者处理能力相差不大的场景。

    其他阻塞队列

    DelayQueue

    DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
    使用场景:DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

    PriorityBlockingQueue

    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

    线程池

    线程工厂

    线程工厂可以统一线程生成的样式,增加线程异常处理对象、定制线程名称等。

    为什么用线程池

    线程相比普通的对象而言,会占用额外的存储空间—栈空间,此外线程启动会产生相应的线程调度开销,线程的销毁也有开销,系统能够创建的线程数受限于系统的处理器数目。
    通过线程池来使用线程更加有效,避免不必要的反复创建线程的开销。

    线程池的基本参数和原理

    线程池预先创建一定数目的工作者线程,客户端不需要向线程池借用线程而是将其需要执行的任务作为一个对象提交给线程池,线程池可能将这些任务缓存在队列之中,而线程池内部的各个工作者线程则不断取出任务并执行之。因此,线程池可以看做基于生产者—消费者模式的一种服务。
    ThreadPoolExecutor类是一个常用的线程池,客户端可以调用ThreadPoolExecutor.submit方法提交任务

    Public Future<?> submit(Runnable task); 
    Public Future< T > submit(Callable<T> task);
    

    Task如果是一个Runnable实例,没有返回结果,Task如果是Callable实例,可以由返回结果。
    线程池内部维护的工作者线程数量称为线程池的大小。线程池大小有3种形态,当前线程池大小表示线程池种实际工作者线程的数量;最大线程池大小表示线程池中允许存在的工作者线程的数量上限;核心线程大小表示一个不大于最大线程池大小的工作者线程数量上限。

    当前线程池大小<=核心线程大小<=最大线程池大小
    或者
    核心线程大小<=当前线程池大小<=最大线程池大小
    

    构造参数

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
      //...
    }
    

    workQueue: 称为工作队列的阻塞队列
    corePoolSize:线程池核心大小
    maximumPoolSize:最大线程池大小
    keepAliveTime和unit指定线程池中空闲线程的最大存活时间
    threadFactory 指定创建工作者线程的线程工厂

    初始状态下,客户端每提交一个任务线程池就创建一个工作者线程来处理任务。随着任务的提交,当前线程池大小相应增加,在当前线程池大小达到核心线程池大小是,新来的任务被存入到工作队列之中。这些缓存的任务由线程池种所有的工作者线程负责取出进行执行。线程池将任务放入工作队列的时候调用的是BlockingQueue的非阻塞方法offer(E e),所以当工作队列满的时候不会使提交任务的客户端线程暂停。当工作队列满的时候,线程池会继续创建新的工作者线程,直到当前线程池大小达到最大线程池大小。
    线程池是通过threadFactory的newThread方法来创建工作者线程的。如果在创建线程池的时候没有指定线程工厂(调用了ThreadPoolExecutor的其他构造器),那么ThreadPoolExecutor会使用Executord.defaultThreadFactory()所返回的默认线程工厂。
    当线程池饱和的时候,即工作队列满且当前线程池大小达到最大线程池大小的情况下,客户端试图提交的任务就会被拒绝。RejectExecutionHandler接口用于封装被拒绝的任务的处理策略,ThreadPoolExecutor提供几个现成的RejectExecutionHandler的实现类,其中ThreadPoolExecutor.AbortPolicy是ThreadPoolExecutor使用的默认RejectExecutionHandler。如果默认的AbortPolicy无法满足可以优先考虑ThreadPoolExecutor提供的其他RejectExecutionHandler,其次考虑自行实现RejectExecutionHandler。
    以下为拒绝策略

    • AbortPolicy:直接抛出异常,默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;
      在当前线程池大小超过核心线程池大小的时候,超过核心线程池大小部分的工作者线程空闲(即工作者队列中没有待处理的任务)时间达到了keepAliveTime所指定的时间后就会被清理掉,即这些工作者线程会自动终止并被从线程池中被移除,需要谨慎设置,否则造成线程反复创建。
      ThreadPoolExecutor.shutdown()/ shutdownNow()方法可以用来关闭线程池。前者会等待已提交线程继续执行,禁止提交新任务,后者直接停止会将正在执行的任务停止,已提交未执行的任务会也不会执行。

    线程池任务执行结果

    如果向线程池提交任务不需要返回结果,提交的任务为Runnable,如果需要处理结果,则提交的任务需要为Callable。
    Public Future<?> submit(Runnable task);
    Public Future< T > submit(Callable task);
    Callable接口是runnable接口的增强,call方法的返回值代表相应任务的处理结果,call方法在执行中可以抛出异常,而runnable接口无返回值也无法抛出异常。
    Future接口实例可以看做提交给线程池执行任务的处理结果句柄,Future.get()方法可以用来获取task参数指定的任务的处理结果。Future.get()被调用时,如果任务未执行完,那么Future.get()会使当前线程暂停,直到相应的任务执行结束,Future.get()是一个阻塞方法,能抛出InterruptedException说明可以响应线程中断。如果任务抛出一个任意的originalExeption,那么Future.get()会抛出ExcutionException,通过调用ExcutionException的getCause()方法可以返回originalExeption,从而捕获原始的异常。Future可以执行cancel(boolean mayInterruptRunning)来取消任务,mayInterruptRunning表示是否允许通过发送中断来取消任务。返回值表示相应任务是否取消成功。另外可以通过isCancelled判断任务是否取消成功,通过isDone判断任务是否执行完成。
    另外线程池提供了一系列监控方法。
    线程池死锁,不应该将有依赖关系的任务提交给同一个线程池,避免死锁。
    注意应该尽早提交任务,尽量晚的执行Future.get(),减少上下文切换。

    异步

    Executor

    Executor接口是对任务执行的抽象,定义了如下方法

    Void execute (Runnable command);
    

    Executor接口使任务的提交和任务执行的具体细节解耦
    不过Executor功能有限,只能执行任务,无法返回结果;Executor需要工作者线程,但是没有释放工作者线程资源的方法。ExecutorService接口继承Executor接口,解决了以上的问题。ExecutorService定义了几个submit方法可以接受Callable接口或者Runnable接口表示的任务并返回相应的Future实例。ExecutorService还定义了shutdown和shutdownNow方法来释放相关资源。ThreadPoolExecutor是ExecutorService的默认实现类。

    Executors与常用线程池

    Executors是一个实用的工具类,可以返回默认线程工厂,将Runnable实例转成Callable实例,还提供了一些能够返回ExecutorService实例的快捷方法,这样不用手动创建ThreadPoolExecutor。

    • newFixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads){
        return new ThreadPoolExecutor(
                nThreads,   // corePoolSize
                nThreads,   // maximumPoolSize == corePoolSize
                0L,         // 空闲时间限制是 0
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>() // 无界阻塞队列
            );
    }
    

    newFixedThreadPool 线程永不过期,适合固定线程的场景。

    • newCachedThreadPool
    public static ExecutorService newCachedThreadPool(){
        return new ThreadPoolExecutor(
            0,                  // corePoolSoze == 0
            Integer.MAX_VALUE,  // maximumPoolSize 非常大
            60L,                // 空闲判定是60 秒
            TimeUnit.SECONDS,
            // 神奇的无存储空间阻塞队列,每个 put 必须要等待一个 take
            new SynchronousQueue<Runnable>()  
        );
    }
    

    newCachedThreadPool适合频繁短时任务
    由于核心线程池大小为0,因此提交给线程池执行的第一个任务会导致该线程池的第一个工作者线程创建启动,后续任务提交的时候,由于当前线程池大小已经查过核心线程池大小(0),因此ThreadPoolExecutor会将任务缓存到工作队列中,即调用workerQueue.offer方法。
    SynchronousQueue内部并不维护用于存储队列元素的实际存储空间,一个线程(生产者)在执行SynchronousQueue. Offer(E)的时候,如果没有其他线程(消费者)因执行SynchronousQueue. take()而被暂停,那么SynchronousQueue. Offer(E)调用会直接抛出false,即如队列失败,说明所有的工作者线程都在运行,即无空闲工作者线程的情况下给提提交任务会导致该任务无法被缓存成功。当ThreadPoolExecutor在缓存失败且线程池当前大小未达到最大线程池大小的情况下会启动新的工作者线程,极端情况下每提交一个任务均会创建一个新的工作者线程,会导致线程数太多,过多上下文切换导致系统被拖慢。
    即在有一个任务被缓存的情况下,会一直增加工作者线程。

    • newSingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor() {
            return 
                new FinalizableDelegatedExecutorService
                    (
                        new ThreadPoolExecutor
                            (
                                1,
                                1,
                                0L,
                                TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory
                            )
                    );
        }
    

    newSingleThreadExecutor适合多生产者-单消费者
    可以看到除了多了个 FinalizableDelegatedExecutorService 代理,其初始化和 newFiexdThreadPool 的 nThreads = 1 的时候是一样的。
    区别就在于:
    • newSingleThreadExecutor返回的ExcutorService在析构函数finalize()处会调用shutdown()
    • 如果我们没有对它调用shutdown(),那么可以确保它在被回收时调用shutdown()来终止线程。
    使用ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,一般采用默认。

    FutereTask

    FutereTask融合了Runnable和Callable的优点。FutereTask是Runnable接口的一个实现类,FutereTask表示的异步任务可以提交给Executor实例或者工作者线程,此外FutereTask可以直接返回其代表的异步任务的处理结果。FutereTask的一个构造方法可以将Callable封装成FutereTask,相当于将Callable转成了Runnable,同时还可以通过FutereTask查看处理结果。
    ThreadPoolExecutor.submit(Callable task)继承AbstractExecutorService.submit方法,实际将Callable对象封装成了FutereTask。

    Executorservice.submit和excutorservice.execute的区别

    ExecutorService接口继承Executor接口,Executor是最上层的,其中只包含一个execute()方法, execute()方法的入参为一个Runnable,返回值为void

    public interface Executor {
        void execute(Runnable command);
    }
    

    submit()是ExecutorService接口中的方法,在ExecutorService接口中,一共有以上三个sumbit()方法,入参可以为Callable,也可以为Runnable,而且方法有返回值Future

    public interface ExecutorService extends Executor {
      ...
      <T> Future<T> submit(Callable<T> task);
    
      <T> Future<T> submit(Runnable task, T result);
    
      Future<?> submit(Runnable task);
      ...
    }
    

    execute()和submit()方法的区别总结:
    (1)接收的参数不一样;
    (2)submit()有返回值,而execute()没有;
    (3)Exception处理方式不同
    例如,如果task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过对Future.get()进行抛出异常的捕获,然后对其进行处理。
    如果调用execute()提交任务中抛出来未捕获的异常,则对其进行执行任务的工作者线程就会异常中止,虽然线程池会创建新的工作者线程,但是这个有开销。
    UncaughtExceptionHandler可以用于捕获线程异常,UncaughtExceptionHandler只有在execute()方法里异常处理类才能生效,通过submit()提交的任务,UncaughtExceptionHandler无法生效。

    多线程的几种实现方式

    (1)继承Thread,重写run方法;
    (2)通过Runnable对象构建Thread对象;
    (3)线程池,ThreadPoolExecutor或者Executors快捷生成线程池;

  • 相关阅读:
    线段树快速查找区间值
    html学习笔记
    区块链是怎么运行的
    【C++ 流类库与输入输出 】实验七
    【C++ 实验六 继承与派生】
    10天冲刺第四天后端app开发
    10天冲刺第三天后端app开发
    10天冲刺第二天之完成后端
    第二次冲刺第一天之后台管理
    第一阶段--冲刺总结
  • 原文地址:https://www.cnblogs.com/lllliuxiaoxia/p/15724940.html
Copyright © 2011-2022 走看看