1、引言
在开发中,有时会遇到批量处理的业务。如果单线程处理,速度会非常慢,可能会导致上游超时。这是就需要使用多线程开发。
创建线程时,应当使用线程池。一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
可以使用J.U.C提供的线程池:ThreadPoolExecutor类。在Spring框架中,也可以使用ThreadPoolTaskExecutor类。ThreadPoolTaskExecutor其实是对ThreadPoolExecutor的一种封装。
2、使用ThreadPoolExecutor类
假设现有业务,输入Input类,输出Output类:
@Data @AllArgsConstructor public class Input { int i; } @Data @AllArgsConstructor public class Output { boolean success; String s; }
这里@Data与@AllArgsConstrutor使用了Lombok工具
处理方法:
public Output singleProcess(Input input) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)) }
现在该业务需要批量处理,输入List<Input>,输出List<Output>。那么可以创建一个核心线程数为4的线程池,每个线程把执行结果添加到线程安全的List中。这里List应当使用SynchronizedList而不是CopyOnWriteArrayList,因为这里写操作有多次,而读操作只有一次。并使用CountDownLatch等待所有线程执行结束:
public List<Output> multiProcess(List<Input> inputList) { ExecutorService executorService = Executors.newFixedThreadPool(4); CountDownLatch countDownLatch = new CountDownLatch(inputList.size()); List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size())); for (Input input : inputList) { executorService.submit(() -> { try { // 单个处理 Output output = singleProcess(input); outputList.add(ouput); } catch (Exception e) { // 处理异常 } finally { countDownLatch.countDown(); } }) } // 等待所有线程执行完成 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return outputList; }
但是这样还是有很大的问题:
- 阿里巴巴开发手册不建议我们使用Executors创建线程池,因为Executors.newFixedThreadPool方法没有限制线程队列的容量,如果input数量过多,可能导致OOM。
- multiProcess不适合被多次调用,不适合用在大多数业务场景。
3、在Spring框架中使用ThreadPoolTaskExecutor类
为了应对大多数业务场景,配合Spring Boot框架,我们可以使用ThreadPoolTaskExecutor创建线程池,并把它注入到ioc容器中,全局都可以使用。
首先,配置线程池参数
@Data @Component @ConfigurationProperties(prefix = "thread-pool") public class ThreadPoolProperties { private int corePoolSize; private int maxPoolSize; private int queueCapacity; private int keepAliveSeconds; }
在配置文件application.yml中
thread-pool:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 80
keep-alive-seconds: 120
这里线程池各参数的意义可以参考Java线程池实现原理及其在美团业务中的实践
其次,将ThreadPoolTaskExecutor加入至ioc容器中
@EnableAsync @Configuration public class ThreadPoolConfig { private final ThreadPoolProperties threadPoolProperties; @Autowired public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) { this.threadPoolProperties = threadPoolProperties; } @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(threadPoolProperties.getCorePoolSize()); executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize()); executor.setQueueCapacity(threadPoolProperties.getQueueCapacity()); executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds()); executor.setThreadNamePrefix("thread-pool-"); return executor; } }
这里@EnableAsync是与@Async配合使用,用于执行异步任务,后面会给出示例
最后,在业务类中通过自定义SpringUtils类获取bean或使用@Async,来使用线程池。
/** * 业务实现类 */ @Service @Slf4j public class Input2OutputServiceImpl implements Input2OutputService { /** * 单个处理 * @param input 输入对象 * @return 输出对象 */ @Override public Output singleProcess(Input input) { log.info("Processing..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)); } /** * 批量处理 * @param inputList 输入对象列表 * @return 输出对象列表 */ @Override public List<Output> multiProcess(List<Input> inputList) { ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class); CountDownLatch latch = new CountDownLatch(inputList.size()); List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size())); for (Input input : inputList) { executor.execute(() -> { try { Output output = singleProcess(input); outputList.add(output); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return outputList; } /** * 异步处理 * @param input 输入对象 * @return 输出Future对象 */ @Async("threadPoolTaskExecutor") @Override public Future<Output> asyncProcess(Input input) { return new AsyncResult<>(singleProcess(input)); } }
以上代码的完整代码包括测试代码在笔者的GitHub项目thread-pool-demo,在项目中用到ThreadPoolTaskExecutor可参考。