zoukankan      html  css  js  c++  java
  • Java多线程批量处理、线程池的使用

    1、引言

    在开发中,有时会遇到批量处理的业务。如果单线程处理,速度会非常慢,可能会导致上游超时。这是就需要使用多线程开发。

    创建线程时,应当使用线程池。一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

    可以使用J.U.C提供的线程池:ThreadPoolExecutor类。在Spring框架中,也可以使用ThreadPoolTaskExecutor类。ThreadPoolTaskExecutor其实是对ThreadPoolExecutor的一种封装。

    2、使用ThreadPoolExecutor类

    假设现有业务,输入Input类,输出Output类:

    @Data
    @AllArgsConstructor
    public class Input {
        int i;
    }
    
    @Data
    @AllArgsConstructor
    public class Output {
        boolean success;
        String s;
    }

    这里@Data与@AllArgsConstrutor使用了Lombok工具

    处理方法:

    public Output singleProcess(Input input) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return new Output(false, null);
        }
        return new Output(true, String.valueOf(2 * input.getI() + 1))
    }

    现在该业务需要批量处理,输入List<Input>,输出List<Output>。那么可以创建一个核心线程数为4的线程池,每个线程把执行结果添加到线程安全的List中。这里List应当使用SynchronizedList而不是CopyOnWriteArrayList,因为这里写操作有多次,而读操作只有一次。并使用CountDownLatch等待所有线程执行结束:

    public List<Output> multiProcess(List<Input> inputList) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CountDownLatch countDownLatch = new CountDownLatch(inputList.size());
        List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));
    
        for (Input input : inputList) {
            executorService.submit(() -> {
                try {
                    // 单个处理
                    Output output = singleProcess(input);
                    outputList.add(ouput);
                } catch (Exception e) {
                    // 处理异常
                } finally {
                    countDownLatch.countDown();
                }
            })
        }
    
        // 等待所有线程执行完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        return outputList;
    }

    但是这样还是有很大的问题:

    1. 阿里巴巴开发手册不建议我们使用Executors创建线程池,因为Executors.newFixedThreadPool方法没有限制线程队列的容量,如果input数量过多,可能导致OOM。
    2. multiProcess不适合被多次调用,不适合用在大多数业务场景。

    3、在Spring框架中使用ThreadPoolTaskExecutor类

    为了应对大多数业务场景,配合Spring Boot框架,我们可以使用ThreadPoolTaskExecutor创建线程池,并把它注入到ioc容器中,全局都可以使用。

    首先,配置线程池参数

    @Data
    @Component
    @ConfigurationProperties(prefix = "thread-pool")
    public class ThreadPoolProperties {
        private int corePoolSize;
        private int maxPoolSize;
        private int queueCapacity;
        private int keepAliveSeconds;
    }
    

      

    在配置文件application.yml中

    thread-pool:
      core-pool-size: 4
      max-pool-size: 16
      queue-capacity: 80
      keep-alive-seconds: 120

    这里线程池各参数的意义可以参考Java线程池实现原理及其在美团业务中的实践

    其次,将ThreadPoolTaskExecutor加入至ioc容器中

      

    @EnableAsync
    @Configuration
    public class ThreadPoolConfig {
        private final ThreadPoolProperties threadPoolProperties;
    
        @Autowired
        public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
            this.threadPoolProperties = threadPoolProperties;
        }
    
        @Bean(name = "threadPoolTaskExecutor")
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
            executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
            executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
            executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
            executor.setThreadNamePrefix("thread-pool-");
            return executor;
        }
    }

    这里@EnableAsync是与@Async配合使用,用于执行异步任务,后面会给出示例

    最后,在业务类中通过自定义SpringUtils类获取bean或使用@Async,来使用线程池。

    /**
     * 业务实现类
     */
    @Service
    @Slf4j
    public class Input2OutputServiceImpl implements Input2OutputService {
        /**
         * 单个处理
         * @param input 输入对象
         * @return 输出对象
         */
        @Override
        public Output singleProcess(Input input) {
            log.info("Processing...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                return new Output(false, null);
            }
            return new Output(true, String.valueOf(2 * input.getI() + 1));
        }
    
        /**
         * 批量处理
         * @param inputList 输入对象列表
         * @return 输出对象列表
         */
        @Override
        public List<Output> multiProcess(List<Input> inputList) {
            ThreadPoolTaskExecutor executor
                    = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
            CountDownLatch latch = new CountDownLatch(inputList.size());
            List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));
    
            for (Input input : inputList) {
                executor.execute(() -> {
                    try {
                        Output output = singleProcess(input);
                        outputList.add(output);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                });
            }
    
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return outputList;
        }
    
        /**
         * 异步处理
         * @param input 输入对象
         * @return 输出Future对象
         */
        @Async("threadPoolTaskExecutor")
        @Override
        public Future<Output> asyncProcess(Input input) {
            return new AsyncResult<>(singleProcess(input));
        }
    }

    以上代码的完整代码包括测试代码在笔者的GitHub项目thread-pool-demo,在项目中用到ThreadPoolTaskExecutor可参考。

  • 相关阅读:
    libnids-1.24 使用源码问题
    Linux学习man page
    shell 脚本,提取文件中的内容
    shell中的语法(1)
    python 爬取百度翻译进行中英互译
    matlab等高线绘制
    matlab 对tif数据高程图的处理分析
    python网络爬虫与信息提取 学习笔记day3
    python网络爬虫与信息提取 学习笔记day2
    python网络爬虫与信息提取 学习笔记day1
  • 原文地址:https://www.cnblogs.com/liudandandear/p/15507667.html
Copyright © 2011-2022 走看看