zoukankan      html  css  js  c++  java
  • Executor 框架

    Executor

    Executor

    Executor 是J.U.C的一个接口,用来处理多线程的。直接说这个可能不太熟,但是大名鼎鼎的ThreadPoolExecutor就是实现了这个接口。

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    ExecutorService

    ExecutorService接口又继承了Executor新增了新的function。

    Callable

    熟悉Thread的都知道,如果要使用一个线程,最原始的方法,是实现Runnable接口,或者是直接继承Thread类并重写run。但是这2种方式都是void。如果需要线程处理的结果就要用Callable

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    在实际使用中,线程池的使用是必然的。【正确的废话】

    ThreadPoolExecutor作为线程池的核心类,只要是用线程池是必定无法绕过去的。可以处有返回无返回的task。

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    

    FutureTask

    FutureTask类实现了RunnableFuture接口,RunnableFuture继承了RunnableFuture接口

    FutureTask的构造函数 分别可以将CallableRunnable 最后返回一个带结果的FutureTaskFutureTask通过Executors.callable(runnable,result)Runnable进行了封装。

    Executor 模型

    main thread 提交task,但是小于corePoolSize的thread在运行,即使其他worker 处于idle,也会有一个·new thread·创建并处理提交的task

    如果运行中的threads 数量大于corePoolSize,但是小于maximumPoolSize,只有当workQueue full的状态下才会创建一个new thread来处理task。也就是说如果workQueue没满的状态下,提交的task会被安排进入workQueue

    如果maximumPoolSizeworkQueue都满了,就会由RejectedExecutionHandler拒绝提交的task

    executor.submit()

    submit作为executor接口的方法,就是提交Callable<T> 或者Runnabletask,最后会返回一个Future<T>的结果

    FutureTask.get()方法或者FutureTask.get(long timeout, TimeUnit unit)方法来获取返回值

    executor.execute()

    execute用来处理没有返回值的任务。

    ThreadPoolExecutor

    1.8 中有4个构造方法,先上2个

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    

    参数

    序号 名称 类型 含义
    1 corePoolSize int 核心线程池大小
    2 maximumPoolSize int 线程池最大size
    3 keepAliveTime long pool超过corepoolsize时,多余的thread可以idle的时间,如果超过这个时间就会terminated
    4 unit TimeUnit 时间单位
    5 workQueue BlockingQueue 线程等待队列
    6 threadFactory ThreadFactory 线程创建工厂
    7 handler RejectedExecutionHandler 拒绝策略
    • corepoolsize Core pool size is the minimum number of workers to keep alive (and not allow to time out etc) unless allowCoreThreadTimeOut is set, in which case the minimum is zero.

      workers keep alive数量的最小值。它的值是决定task是创建新线程还是 进入workQueue。

    • maximumPoolSize Maximum pool size. Note that the actual maximum is internally bounded by CAPACITY.

      pool的最大值。根据使用的workQueue类型,决定线程池开辟的最大线程数量。

    • keepAliveTime Timeout in nanoseconds for idle threads waiting for work. Threads use this timeout when there are more than corePoolSize present or if allowCoreThreadTimeOut. Otherwise they wait forever for new work.

      当idle的thread数量大于corepoolsize,多余的线程在多长时间被销毁

    • unit keepAliveTime 的单位

    • workQueue The queue used for holding tasks and handing off to worker threads. We do not require that workQueue.poll() returning null necessarily means that workQueue.isEmpty(), so rely solely on isEmpty to see if the queue is empty (which we must do for example when deciding whether to transition from SHUTDOWN to TIDYING). This accommodates special-purpose queues such as DelayQueues for which poll() is allowed to return null even if it may later return non-null when delays expire.

      用来存放task,并将task转给worker thread。

    • threadFactory Factory for new threads. All threads are created using this factory (via method addWorker).

      factory 就是用来创建线程的工厂

    • handler Handler called when saturated or shutdown in execute.

      当task太多,超过了线程池处理的时候,就会使用拒绝策略。

      • AbortPolicy 默认策略,抛出RejectedExecutionException异常
      • CallerRunsPolicy 会⽤调⽤execute函数的上层线程去执行被拒
        绝的任务
      • DiscardOldestPolicy 抛弃最旧的任务,再把这个新任务添加到
        队列
      • DiscardPolicy 抛弃当前的任务
      • 或者自定义策略实现RejectedExecutionHandler接口

    FixedThreadPool

    固定线程数的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    corePoolSize = maximumPoolSize

    keepAliveTime的值为0L,表明工作线程如果空闲会被立即终止

    用的是LinkedBlockingQueue,是链表型的无界队列,队列容量为Integer.MAX_VALUE

    无界队列

    无界队列带来的问题是,所有task都可以存放在queue中,(在硬件资源足够大的场景下)永远不会有满的状态。

    这样的情况下,线程池中的thread永远不会超过corepoolsize。

    同时maximumPoolSize和keepAliveTime都无效,而且不会因为线程数量问题触发饱和策略。

    工作流程:

    • 如果当前运行中的线程数小于corepoolsize,就new thread执行task

    • 如果当前运行的线程等于corepoolsize,就将task放入LinkedBlockingQueue

    • 当worker执行完后,不会销毁,会循环的从LinkedBlockingQueue中获取task来执行

    SingleThreadExecutor

    单个线程的线程池

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    corePoolSize = maximumPoolSize = 1,实际只使⽤了corePool

    keepAliveTime =0;表明工作线程如果空闲会被立即终止

    用的是LinkedBlockingQueue,是链表型的无界队列,队列容量为Integer.MAX_VALUE

    基本和FixedThreadPool 一样,但是只有一个thread

    工作流程

    • 线程池中无运行的线程,就会创建一个new thread执行task
    • 如果线程池中已有正在运行的线程,就将任务加入到LinkedBlockingQueue
    • 当worker thread完成task,不会立即销毁,而是从LinkedBlockingQueue循环获取task

    CachedThreadPool

    需要创建新线程的线程池

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    corePoolSize =0 maximumPoolSize =Integer.MAX_VALUE, 说明线程池理论上无边界

    SynchronousQueue 是一个没有容量的阻塞队列,每一次入队操作必须要有另一个出队操作与之匹配,反之亦然。

    keepAliveTime =60s;表明工作线程如果空闲60s会被终止

    当CachedThreadPool中的线程数达到maximumPoolSize ,会调用饱和策略。

    工作流程

    • 当主线程首先执行Synchronous.offer() ,即入队。如果当前线程池正好有空闲的thread执行poll().,那么offer和poll配对成功,主线程会将task移交给空闲的thread执行。
    • 如果线程池为空,或者没有空闲的线程,此时的offer和poll匹配失败。cachedThreadPool 就会创建新的线程执行task。
    • 当worker thread执行完成task,会执行poll(),该操作会让空闲线程等60s。如果60s内有offer操作,此时空闲线程就会获得新的task,否则就会被terminated

    ScheduledThreadPoolExecutor

    单个线程

     public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
    

    当然他还有其他的构造函数,可以设定corepoolsize。

    ScheduledThreadPoolExecutor(int corepoolsize)内部调用

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    

    corePoolSize >=1,maximumPoolSize =Integer.MAX_VALUE

    keepAliveTime =0;表示线程一旦空闲就会终止。

    特别的地方在于使用了DelayedWorkQueue,内部封装了一个优先队列,可以实现task的定期执行和延后执行。而且这个queue可以支持自动扩容,最大容量Integer.MAX_VALUE

    工作流程

    • 当调用ScheduledThreadPoolExecutor的 scheduleAtFixedRate() 方法或者
      scheduleWithFixedDelay() 方法时,会向DelayQueue 添加⼀个实现了
      RunnableScheduledFuture 接口的 ScheduledFutureTask 。
    • 然后从DelayQueue获取ScheduledFutureTask

    • time 代表具体执行时间
    • sequenceNumber 代表插入的序号
    • period: 表示任务执行的时间间隔

    所以通过三个参数,DelayQueue会维护一个优先级,时间小的先被执行。如果time相同,则按seq进行排序,序号较小的先执行

    Tag

    queue,executor

  • 相关阅读:
    虚拟主机服务器错误404解决方法
    虚拟主机服务器错误404解决方法
    虚拟主机服务器错误404解决方法
    以太坊:Truffle开发入门
    区块链是低成本的工业化基因测序解决方案
    区块链公证和知识产权(IP)保护原理
    区块链公证和知识产权(IP)保护应用场景
    ACOUG China Tour 2019上海站,等你来约!
    解析MySQL基础架构及一条SQL语句的执行流程和流转
    jquery 查找节点
  • 原文地址:https://www.cnblogs.com/dreamtaker/p/14697453.html
Copyright © 2011-2022 走看看