zoukankan      html  css  js  c++  java
  • 线程之ExecutorService

    总览

    Executor

    package java.util.concurrent;
    
    public interface Executor {
    
        void execute(Runnable command);
    }
    
    • 命令模式
    • 分离任务提交和任务执行

    ExecutorService

    package java.util.concurrent;
    import java.util.List;
    import java.util.Collection;
    
    public interface ExecutorService extends Executor {
    
        void shutdown();
    
        List<Runnable> shutdownNow();
    
        boolean isShutdown();
    
        boolean isTerminated();
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    
    • 提供具有返回值的Callable对象
    • 异步任务提交结果
    • 批量任务提交
    • 生命周期管理

    Runnable转Callable

    • 适配器模式
    • java.util.concurrent.Executors.RunnableAdapter
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

    Future

    package java.util.concurrent;
    
    public interface Future<V> {
    
        boolean cancel(boolean mayInterruptIfRunning);
    
        boolean isCancelled();
    
        boolean isDone();
    
        V get() throws InterruptedException, ExecutionException;
    
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    
    • 获取任务异步执行的结果

    异步原理

    示例

        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future<String> submit = executorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName();
                }
            });
            String result = submit.get();
            System.out.println(result);
        }
    
    • 提交一个Callable对象或者Runnable对象到ExecutorService里,能够返回一个Future对象,并且可以通过Future对象获取Callable对象或者Runnable对象 的运行结果

    • 接口关系

    public interface RunnableFuture<V> extends Runnable, Future<V>
    public class FutureTask<V> implements RunnableFuture<V>
    
    • AbstractExecutorService
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
    • FutureTask
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
    1. 在ExecutorService中,Runnable被适配为Callable
    2. RunnableFuture继承自Runnable和Future
    3. 实现类FutureTask封装了任务运行和结果之间的状态转换
    4. 提交给ExecutorService的Callable对象被封装成RunnableFuture
    5. execute方法传入的是RunnableFuture,返回的对象也是RunnableFuture,同一个对象
    6. 调用Future的get方法时,会被阻塞,知道任务执行完成后才会唤醒get阻塞,返回结果
    7. FutureTask阻塞使用LockSupport,阻塞队列使用Treiber stack

    AbstractExecutorService

    • 封装任务为FutureTask,实现通用submit
    • 实现invokeAny,代理给ExecutorCompletionService实现invokeAny语义
    • 实现invokeAll,遍历Future列表,调用get直至所有结果返回

    ExecutorCompletionService

    • 依赖Executor实现任务真正执行
    • 内部维护一个阻塞队列
    • 使用QueueingFuture继承FutureTask,当任务执行完成后,把Future对象放进队列
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    
    • 提供take方法阻塞等待获取结果

    总结:维护一个阻塞队列,当一批任务执行时,每个任务执行完成后都会把Future对象放进队列,因此队列take到的都是执行完成的任务。该线程池的优点是不用自己阻塞每个Future,而是提交一批任务后,优先处理先完成的任务就好了。依赖这个特性实现invokeAny语义,只需要调用take方法,有返回就结束执行即可。

    • 使用示例见java.util.concurrent.AbstractExecutorService.doInvokeAny

    ThreadPoolExecutor

    • 主要实现execute方法,submit方法代理到这个方法的实际参数类型已经是RunnableFuture类型了

    构造方法

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • 核心线程数
    • 最大线程数
    • 存活时间
    • 存活时间单位
    • 阻塞队列
    • 线程工厂
    • 拒绝策略

    execute流程

    1. 如果当前线程数量小于corePoolSize,直接创建Worker工作线程,执行任务

    2. 如果当前线程数量大于corePoolSize,任务进阻塞队列

    3. 如果阻塞队列满,线程数量小于maximumPoolSize,创建Worker执行任务

    4. 如果线程数达到maximumPoolSize,执行拒绝策略

    5. Worker工作线程启动后,从阻塞队列阻塞获取任务。

    6. 如果线程数大于corePoolSize,那么阻塞获取任务时会调用poll(keepAliveTime, TimeUnit.NANOSECONDS);如果阻塞超时没获取到任务,此时线程数大于corePoolSize会销毁当前线程

    7. 如果线程数不大于corePoolSize并且没有设置核心线程超时,一直take。可以通过allowCoreThreadTimeOut方法让核心线程超时

    8. 可以通过prestartCoreThread预启动核心线程

    9. 可以通过继承当前类重写beforeExecute,afterExecute方法进行任务执行拦截

    10. 拒绝策略:AbortPolicy抛异常;DiscardPolicy丢弃当前任务;DiscardOldestPolicy丢弃最老的任务;CallerRunsPolicy任务提交者执行

    11. CallerRunsPolicy比较有用,可以通过拉低提交速率,减缓线程池压力,达到背压的效果

    ScheduledExecutorService

    • 调度线程池,提供简单的调度规则
    • scheduleAtFixedRate固定速率
    • scheduleWithFixedDelay固定延时
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
    • 使用延时队列作为内部阻塞队列,最小堆算法
    public interface ScheduledFuture<V> extends Delayed, Future<V> {
    }
    private class ScheduledFutureTask<V>
    extends FutureTask<V> implements RunnableScheduledFuture<V>{
    //省略其他方法
            public void run() {
                boolean periodic = isPeriodic();
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();
                    reExecutePeriodic(outerTask);
                }
            }
    }
    
    • 扩展FutureTask,周期调度任务会调用java.util.concurrent.FutureTask.runAndReset(),使周期任务可以重复被调度
    • 固定频率和固定延时的差异为下次任务执行时间计算,固定频率时执行完成时间点加上间隔;固定延时时任务开始时间加上间隔
    • 任务执行完成后,计算下次执行时间,重新进入延时队列调度
  • 相关阅读:
    mysql主从复制
    nginx代理tcp协议连接mysql
    spark安装配置
    DataX 3.0简介 安装及使用
    Consul 快速入门
    Etcd 使用入门
    常见负载均衡算法
    Container is running beyond memory limits
    HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}
    快速搞定Windows10环境下hadoop安装和配置
  • 原文地址:https://www.cnblogs.com/zby9527/p/13151379.html
Copyright © 2011-2022 走看看