1、config目录下创建线程池对象
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${task.pool.corePoolSize}") private int corePoolSize; @Value("${task.pool.maxPoolSize}") private int maxPoolSize; @Value("${task.pool.keepAliveSeconds}") private int keepAliveSeconds; @Value("${task.pool.queueCapacity}") private int queueCapacity; @Value("${task.pool.threadNamePrefix}") private String threadNamePrefix; @Bean public Executor asyncReleaseServiceExecutor() { logger.info("...ExecutorConfig...asyncServiceExecutor()...启动[发布任务]线程池..."); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setThreadNamePrefix(threadNamePrefix); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
2、配置文件中添加对应属性
task.pool.corePoolSize=10 task.pool.maxPoolSize=20 task.pool.keepAliveSeconds=300 task.pool.queueCapacity=999 task.pool.threadNamePrefix=Grape-
3、启动类上开启异步任务
@EnableAsync @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication .class, args); } }
4、编写异步执行类和方法
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONPath; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @Component public class AsyncSendSqlService { private static final Logger logger = Logger.getLogger(AsyncSendSqlService.class); @Async("asyncReleaseServiceExecutor") Future<AsyncSendSqlResult> asyncSendSql(String url, String jsonRequestParams, CountDownLatch latch) { String currentThreadName = Thread.currentThread().getName(); String httpURL = ServiceConstant.STR_HTTP + url + deliveryServerUrl; logger.info("---线程: 【" + currentThreadName + "】 的发布地址: " + url + "---"); long start = System.currentTimeMillis(); String result = null; try { result = OkHttpUtil.postJson(httpURL, jsonRequestParams); } catch (Exception e) { logger.error("---线程: 【" + currentThreadName + "】---AsyncSendSqlService---asyncSendSql()---error:" + e); } logger.info("---线程: 【" + currentThreadName + "】 的返回结果 ::: " + result); logger.info("---线程: 【" + currentThreadName + "】 的请求时间: " + (System.currentTimeMillis() - start) + " ms"); AsyncSendSqlResult asyncSendSqlResult = transformAsyncSendSqlResult(url, result); latch.countDown(); ScriptValueSnapShotEntity scriptValueSnapShotEntity = saveSnapshoot(scriptProject.getId(), jsonRequestParams); return new AsyncResult<>(asyncSendSqlResult); } }
5、调用多线程任务的方法
try { //加闩 CountDownLatch latch = new CountDownLatch(projectUrlList.size()); for (String url : projectUrlList) { Future<AsyncSendSqlResult> asyncSendSqlResultFuture = asyncSendSqlService.asyncSendSql(url, requestParamsJsonStr, latch); asyncSendSqlResultList.add(asyncSendSqlResultFuture.get()); } //等待N个线程执行完毕 latch.await(); } catch (ExecutionException e) { logger.error("---发布多线程异常 ::: " + e); e.getStackTrace(); } catch (InterruptedException e) { logger.error("---发布多线程异常 ::: " + e); e.getStackTrace(); }
注意事项:
1)异步调用的方法要单独放置一个类中,否则@Async注解不生效
2)多个等待多个线程执行后一同处理结果,使用加门闩(CountDownLatch类)实现
3) @Async注解里面的参数和@Bean方法名称保持一致
参考: