zoukankan      html  css  js  c++  java
  • spring对java线程池封装源码解析

    spring的 ThreadPoolTaskExecutor类最终是通过调用java 的ThreadPoolExecutor的void execute(Runnable task)方法或Future<?> submit(Runnable task)方法执行任务的

    下面是spring的任务执行类和接口的继承层次

    interface Executor

         void execute(Runnable command);

    interface TaskExecutor extends Executor

        void execute(Runnable task);

    interface AsyncTaskExecutor extends TaskExecutor

        void execute(Runnable task, long startTimeout);

        Future<?> submit(Runnable task);

        <T> Future<T> submit(Callable<T> task);

    interface SchedulingTaskExecutor extends AsyncTaskExecutor

         boolean prefersShortLivedTasks();

     任务执行类

    class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor

     成员变量

    private ThreadPoolExecutor threadPoolExecutor;

    执行任务方法

    public void execute(Runnable task) {
    Executor executor = getThreadPoolExecutor();
    try {
    executor.execute(task);
    }
    catch (RejectedExecutionException ex) {
    throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
    }

    public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
    Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
    return this.threadPoolExecutor;
    }

    基类ExecutorConfigurationSupport

    abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
    implements BeanNameAware, InitializingBean, DisposableBean

    其中基类CustomizableThreadFactory为自定义线程工厂类

     成员变量

    private ExecutorService executor;

    生命周期初始化
    public void afterPropertiesSet() {
    initialize();
    }

    初始化方法

    /**
    * Set up the ExecutorService.
    */
    public void initialize() {
    if (logger.isInfoEnabled()) {
    logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }
    if (!this.threadNamePrefixSet && this.beanName != null) {
    setThreadNamePrefix(this.beanName + "-");
    }
    this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

     抽象方法(子类ThreadPoolTaskExecutor实现)

    protected abstract ExecutorService initializeExecutor(
    ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);

    生命周期销毁方法

    public void destroy() {
    shutdown();
    }

     ThreadPoolTaskExecutor实现ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);方法

    protected ExecutorService initializeExecutor(
    ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
    queue, threadFactory, rejectedExecutionHandler);
    if (this.allowCoreThreadTimeOut) {
    executor.allowCoreThreadTimeOut(true);
    }

    this.threadPoolExecutor = executor;
    return executor;
    }

    我们注意到上面的方法 initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler)是父类ExecutorConfigurationSupport调用的,初始化父类成员变量private ExecutorService executor;

    而ThreadPoolTaskExecutor实际执行任务是采用的自身成员变量private ThreadPoolExecutor threadPoolExecutor;

    public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
    Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
    return this.threadPoolExecutor;
    }

    不明白这里为什么要这么处理?

    再看其他部分

    ExecutorConfigurationSupport 里面配置默认拒绝策略

    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

    ThreadPoolTaskExecutor成员变量

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;//默认最大线程池

    private int keepAliveSeconds = 60;

    private boolean allowCoreThreadTimeOut = false;

    private int queueCapacity = Integer.MAX_VALUE;//默认队列容量

    阻塞队列创建方法

    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
    if (queueCapacity > 0) {
    return new LinkedBlockingQueue<Runnable>(queueCapacity);
    }
    else {
    return new SynchronousQueue<Runnable>();
    }
    }

  • 相关阅读:
    linux下shell显示-bash-4.1#不显示路径解决方法
    update chnroute
    An error "Host key verification failed" when you connect to other computer by OSX SSH
    使用dig查询dns解析
    DNS被污染后
    TunnelBroker for EdgeRouter 后记
    mdadm详细使用手册
    关于尼康黄的原因
    Panda3d code in github
    Python实例浅谈之三Python与C/C++相互调用
  • 原文地址:https://www.cnblogs.com/chenying99/p/2821375.html
Copyright © 2011-2022 走看看