zoukankan      html  css  js  c++  java
  • java~并行计算~大集合的并行处理

    上一次写了关于《FunctionalInterface~一个批量处理数据的类》和《Future和Callable实现大任务的并行处理》的文章,本讲主要结合实际应用,来封装一个集合并行处理组件,我们的集合分为数据库查询出现的分页集合;还有一个是内存的集合,今天主要说一下内存集合的并行处理。

    场景介绍

    • 有一个比较耗时的工作,将top 400的用户的行为信息统计
    • 统计的信息来自很多业务,很多服务,不能使用聚合直接计算
    • 这些业务统计的时间,大概每个人平均需要1秒
    • 这些用户的各种类型,彼此独立,没有关系

    如何设计

    如果直接顺序写代码,那1万的用户,需要400秒的时间,这是我们不能接受的,我们使用并行编程8秒就把它搞定。

    如何实现

    • 400的集合,进行拆分,每100个为一组,分为4组(4页)
    • 对每100个集合进行拆分,每2个为1组,将100个分成了50组
    • 对50组数据,开50个线程并行处理,结果为2行完成
    • 400的信息,分成了4页,每页2秒,一共8秒

    代码实现

    /**
     * 数据集并行处理工具
     */
    public class DataHelper {
        /**
         * 并行处理线程数字
         */
        static final int THREAD_COUNT = 50;
        /**
         * 单线程中处理的集合的长度,50个线程,每个线程处理2条,如果处理时间为1S,则需要2S的时间.
         */
        static final int INNER_LIST_LENGTH = 2;
        static Logger logger = LoggerFactory.getLogger(DataHelper.class);
    
        /**
         * 大集合拆分.
         *
         * @param list
         * @param len
         * @param <T>
         * @return
         */
        private static <T> List<List<T>> splitList(List<T> list, int len) {
            if (list == null || list.size() == 0 || len < 1) {
                return null;
            }
            List<List<T>> result = new ArrayList<List<T>>();
            int size = list.size();
            int count = (size + len - 1) / len;
            for (int i = 0; i < count; i++) {
                List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
                result.add(subList);
            }
            return result;
        }
    
        /**
         * 并行处理.
         *
         * @param list     大集合
         * @param pageSize 单页数据大小
         * @param consumer 处理程序
         * @param <T>
         */
        public static <T> void fillDataByPage(List<T> list,
                                              int pageSize,
                                              Consumer<T> consumer) {
    
            List<List<T>> innerList = new ArrayList<>();
            splitList(list, pageSize).forEach(o -> innerList.add(o));
            int totalPage = innerList.size();
            AtomicInteger i = new AtomicInteger();
            innerList.forEach(items -> {
                ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
                i.getAndIncrement();
                Collection<BufferInsert<T>> bufferInserts = new ArrayList<>();
                splitList(items, INNER_LIST_LENGTH).forEach(o -> {
                    bufferInserts.add(new BufferInsert(o, consumer));
                });
    
                try {
                    executor.invokeAll(bufferInserts);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executor.shutdown();
                logger.info("【当前数据页:{}/{}】", i.get(), totalPage);
            });
    
        }
    
        /**
         * 多线程并发处理数据.
         *
         * @param <T>
         */
        static class BufferInsert<T> implements Callable<Integer> {
            /**
             * 要处理的数据列表.
             */
            List<T> items;
            /**
             * 处理程序.
             */
            Consumer<T> consumer;
    
            public BufferInsert(List<T> items, Consumer<T> consumer) {
                this.items = items;
                this.consumer = consumer;
            }
    
            @Override
            public Integer call() {
                for (T item : items) {
                    this.consumer.accept(item);
                }
                return 1;
            }
        }
    
    }
    

    调用代码

        /**
         * 8秒处理400个任务,每个任务执行时间为1S,并行的威力
         */
        @Test
        public void test() {
            List<Integer> sumList = new ArrayList<>();
            for (int i = 0; i < 400; i++) {
                sumList.add(i);
            }
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            DataHelper.fillDataByPage(sumList, 100, (o) -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            stopWatch.stop();
            System.out.println("time:" + stopWatch.getTotalTimeMillis());
        }
    

    结果截图

  • 相关阅读:
    【解读】Https协议
    【解读】Http协议
    tomcat中AJP协议和HTTP协议的区别
    TOMCAT原理详解及请求过程
    Redis持久性——RDB和AOF
    redis配置文件解读
    HttpClient优化
    crontab与系统时间不一致
    天兔(Lepus)监控操作系统(OS)安装配置
    MySQL 优化之 index_merge (索引合并)
  • 原文地址:https://www.cnblogs.com/lori/p/15251165.html
Copyright © 2011-2022 走看看