zoukankan      html  css  js  c++  java
  • spring boot使用自定义配置的线程池执行Async异步任务

    一、增加配置属性类

    package com.chhliu.springboot.async.configuration; 
    import org.springframework.boot.context.properties.ConfigurationProperties; 
      
    @ConfigurationProperties(prefix = "spring.task.pool") // 该注解的locations已经被启用,现在只要是在环境中,都会优先加载 
    public class TaskThreadPoolConfig { 
     private int corePoolSize; 
      
     private int maxPoolSize; 
      
     private int keepAliveSeconds; 
      
     private int queueCapacity; 
       
     …………省略getter,setter方法………… 
    }

    二、创建线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.chhliu.springboot.async.pool; 
    import java.util.concurrent.Executor; 
    import java.util.concurrent.ThreadPoolExecutor; 
    import org.springframework.beans.factory.annotation.Autowired; 
    import org.springframework.context.annotation.Bean; 
    import org.springframework.context.annotation.Configuration; 
    import org.springframework.scheduling.annotation.EnableAsync; 
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
      
    import com.chhliu.springboot.async.configuration.TaskThreadPoolConfig; 
      
    @Configuration
    @EnableAsync
    public class TaskExecutePool { 
      
     @Autowired
     private TaskThreadPoolConfig config; 
      
     @Bean
     public Executor myTaskAsyncPool() { 
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
      executor.setCorePoolSize(config.getCorePoolSize()); 
      executor.setMaxPoolSize(config.getMaxPoolSize()); 
      executor.setQueueCapacity(config.getQueueCapacity()); 
      executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); 
      executor.setThreadNamePrefix("MyExecutor-"); 
      
      // rejection-policy:当pool已经达到max size的时候,如何处理新任务 
      // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
      executor.initialize(); 
      return executor; 
     } 
    }
    

      

     
     

    三、在主类中开启配置支持

    package com.chhliu.springboot.async; 
    import org.springframework.boot.SpringApplication; 
    import org.springframework.boot.autoconfigure.SpringBootApplication; 
    import org.springframework.boot.context.properties.EnableConfigurationProperties; 
    import org.springframework.scheduling.annotation.EnableAsync; 
      
    import com.chhliu.springboot.async.configuration.TaskThreadPoolConfig; 
      
    @SpringBootApplication
    @EnableAsync
    @EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持 
    public class SpringbootAsyncApplication { 
      
     public static void main(String[] args) { 
      SpringApplication.run(SpringbootAsyncApplication.class, args); 
     } 
    } 
    

      

     
     
     

    四、测试类

    package com.chhliu.springboot.async.pool; 
      
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 
    import org.springframework.scheduling.annotation.Async; 
    import org.springframework.stereotype.Component; 
      
    @Component
    public class AsyncTask { 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass()); 
       
     @Async("myTaskAsyncPool") //myTaskAsynPool即配置线程池的方法名,此处如果不写自定义线程池的方法名,会使用默认的线程池 
     public void doTask1(int i) throws InterruptedException{ 
      logger.info("Task"+i+" started."); 
     } 
    }
    

      

     
     
     

    五、测试

    package com.chhliu.springboot.async; 
    import java.util.concurrent.ExecutionException; 
    import org.junit.Test; 
    import org.junit.runner.RunWith; 
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 
    import org.springframework.beans.factory.annotation.Autowired; 
    import org.springframework.boot.test.context.SpringBootTest; 
    import org.springframework.test.context.junit4.SpringRunner; 
      
    import com.chhliu.springboot.async.pool.AsyncTask; 
      
    @RunWith(SpringRunner.class) 
    @SpringBootTest
    public class SpringbootAsyncApplicationTests { 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass()); 
     @Autowired
     private AsyncTask asyncTask; 
      
     @Test
     public void AsyncTaskTest() throws InterruptedException, ExecutionException { 
      
      for (int i = 0; i < 100; i++) { 
       asyncTask.doTask1(i); 
      } 
      
      logger.info("All tasks finished."); 
     } 
    }
    

      

     
     

    测试结果如下:

    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-10] c.c.springboot.async.pool.AsyncTask  : Task60 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-25] c.c.springboot.async.pool.AsyncTask  : Task61 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-6] c.c.springboot.async.pool.AsyncTask  : Task62 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-23] c.c.springboot.async.pool.AsyncTask  : Task63 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-20] c.c.springboot.async.pool.AsyncTask  : Task64 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-19] c.c.springboot.async.pool.AsyncTask  : Task65 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-16] c.c.springboot.async.pool.AsyncTask  : Task66 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-15] c.c.springboot.async.pool.AsyncTask  : Task67 started. 
    2017-03-20 20:15:15.208 INFO 4068 --- [ MyExecutor-12] c.c.springboot.async.pool.AsyncTask  : Task68 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-1] c.c.springboot.async.pool.AsyncTask  : Task69 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-11] c.c.springboot.async.pool.AsyncTask  : Task81 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-8] c.c.springboot.async.pool.AsyncTask  : Task82 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-7] c.c.springboot.async.pool.AsyncTask  : Task83 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-4] c.c.springboot.async.pool.AsyncTask  : Task84 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-29] c.c.springboot.async.pool.AsyncTask  : Task85 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-21] c.c.springboot.async.pool.AsyncTask  : Task86 started. 
    2017-03-20 20:15:15.209 INFO 4068 --- [ MyExecutor-17] c.c.springboot.async.pool.AsyncTask  : Task88 started.
    

      

       

    测试结果ok!

     

    六、配置默认的线程池

    如果我们想使用默认的线程池,但是只是想修改默认线程池的配置,那怎么做了,此时我们需要实现AsyncConfigurer类,示例代码如下:

    import java.lang.reflect.Method; 
    import java.util.concurrent.Executor; 
    import java.util.concurrent.ThreadPoolExecutor; 
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; 
    import org.springframework.beans.factory.annotation.Autowired; 
    import org.springframework.context.annotation.Configuration; 
    import org.springframework.scheduling.annotation.AsyncConfigurer; 
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
    import com.chhliu.cq.emailservice.threadconfiguration.TaskThreadPoolConfig; 
    import lombok.extern.slf4j.Slf4j; 
      
    /** 
     * 注意:该线程池被所有的异步任务共享,而不属于某一个异步任务 
     * 描述:配置异步任务的线程池 
     * @author chhliu 
     * 创建时间:2017年5月22日 上午10:20:56 
     * @version 1.2.0 
     */
    @Slf4j
    @Configuration
    public class AsyncTaskExecutePool implements AsyncConfigurer{ 
      
     @Autowired
     private TaskThreadPoolConfig config; // 配置属性类,见上面的代码 
      
     @Override
     public Executor getAsyncExecutor() { 
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
      executor.setCorePoolSize(config.getCorePoolSize()); 
      executor.setMaxPoolSize(config.getMaxPoolSize()); 
      executor.setQueueCapacity(config.getQueueCapacity()); 
      executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); 
      executor.setThreadNamePrefix("taskExecutor-"); 
      
      // rejection-policy:当pool已经达到max size的时候,如何处理新任务 
      // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
      executor.initialize(); 
      return executor; 
     } 
      
     @Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {// 异步任务中异常处理 
      return new AsyncUncaughtExceptionHandler() { 
         
       @Override
       public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) { 
        log.error("=========================="+arg0.getMessage()+"=======================", arg0); 
        log.error("exception method:"+arg1.getName()); 
       } 
      }; 
     } 
    }
    

      

       
  • 相关阅读:
    转换流--OutputStreamWriter类与InputStreamReader类
    Android getResources的作用和须要注意点
    sqlit使用要点之引入libsqlite3.dylib
    C语言文件操作之fgets()
    5款伊思儷超媒體繁体游戏 中文简体补丁
    memcpy的使用方法总结
    开发人员改变世界的初心
    expect
    HDU 1061 N^N (n的n次方的最后一位)
    linux杂谈(二十):apache服务配置
  • 原文地址:https://www.cnblogs.com/sbj-dawn/p/9075347.html
Copyright © 2011-2022 走看看