添加一个类ThreadPoolConfig.java
package com.cjcx.inter.framework.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ThreadPoolConfig { /** Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 3; /** Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 7; /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 5; @Bean public Executor interfaceAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("interface-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
方法调用
public boolean doSendTask(ShoppingReceiptContentDto dto) {
boolean bool = false;
logger.info("单号:{}, ====开始上传数据", dto.getOrderNum());
try {
Future<HashMap<String, Object>> future = testAync(dto);
long s = System.currentTimeMillis();
boolean flag = true;
while (flag) {
//异步任务完成并且未被取消,则获取返回的结果
if (future.isDone() && !future.isCancelled()) {
HashMap<String, Object> futureResult = future.get();
logger.info("单号:{}, ====上传的结果是:{}", dto.getOrderNum(), futureResult.get("errorCode"));
Integer errorCode = Integer.parseInt(futureResult.get("errorCode").toString());
bool = errorCode == 0 ? true : false;
flag = false;
}
//如果3秒内没有响应,取消任务
if (System.currentTimeMillis() - s > 5000) {
logger.info("单号:{}, ====上传超时,5秒钟内K11服务器无返回", dto.getOrderNum());
future.cancel(true);
bool = false;
flag = false;
}
}
} catch (Exception e) {
e.printStackTrace();
}
logger.info("单号:{}, ====结束, 上传{}", dto.getOrderNum(), (bool ? "成功" : "失败"));
return bool;
}
@Async("interfaceAsync") public Future<HashMap<String, Object>> testAync(ShoppingReceiptContentDto dto) throws InterruptedException { HashMap<String, Object> map = new HashMap<>(); String k11Response = ""; try { //处理数据map.put("errorCode", 0); //测试先改为成功 } catch (Exception e) { e.printStackTrace(); map.put("errorCode", -10001); } return new AsyncResult<>(map); }
@Aync 内部实现也就是FutureTask