zoukankan      html  css  js  c++  java
  • 使用CyclicBarrier+线程池,按总页数分批次开多线程执行逻辑

      通过CyclicBarrier+线程池的方式,同步的方式分页分批次并发高效处理逻辑,将总页数分成多个批次并发执行每页逻辑,每个批次处理DO_MAX_SIZE个页,每个批次等待DO_MAX_SIZE个页数处理完成后才执行下一个批次,并等待所有批次执行完成才处理后续逻辑

      以下代码只需要在TODO处添加上自己的逻辑就可以达到处理效果

        /**
         * 线程池初始化,也可用其它初始化方式
         */
        private ExecutorService threadPool = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
    
        /**
         * 最大并发数量
         */
        final static int DO_MAX_SIZE = 8;
    
        /**
         * 线程批量并发执行数据,分页分线程处理,每页处理分配一个线程处理相关逻辑
         *  最大并发线程{@code DO_MAX_SIZE}
         * @param total 总条数
         * @author wl
         * @date 2019-08-20 18:21
         */
        private void executorMultiThreadPool (Long total) throws Exception {
            int startPage = 1, endPage = 0, pageSize = 200;
            // 按每页pageSize条算出总页数
            Long pageTotal = (total  +  pageSize  - 1) / pageSize;
            // 循环总页数然后按一组DO_MAX_SIZE个页来并发处理
            for (int i = 1; i <= pageTotal; i++) {
                endPage++;
                if (i % DO_MAX_SIZE != 0 && i != pageTotal) {
                    continue;
                }
                long timeStart = System.currentTimeMillis();
                int cbNum = endPage - startPage + 1;
                // 参数parties+1是因为给主线程添加了await,需要等待整个批次完成才执行后续逻辑;
                final CyclicBarrier cb = new CyclicBarrier(cbNum + 1);
                // 当前批次下需要处理的页数
                for (int page = startPage; page <= endPage; page++) {
                    final int pageNum = page;
                    threadPool.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // TODO: 处理当前页 pageNum 逻辑,此处可以查询pageNumb的数据,然后处理相关逻辑
                            } catch (Exception e) {
                                // TODO: 异常处理
                            } finally {
                                try {
                                    // 设置等待
                                    cb.await();
                                } catch (Exception e) {
                                    logger.info(">>>>>Finally处理异常", e);
                                }
                            }
                        }
                    });
                }
                // 等待全部处理
                try {
                    // 等待本批次全部处理完成
                    cb.await();
                } catch (Exception e) {
                    throw e;
                } finally {
                    logger.info(">>>>>线程池批次执行-批次完成用时[{}]", System.currentTimeMillis() - timeStart);
                }
                // 本批次处理完后,将结束页赋给开始
                startPage = endPage + 1;
            }
            logger.info(">>>>>线程池处理所有批次-执行完成");
        }
  • 相关阅读:
    STP生成树协议
    Fiddler快速入门(还有一个功能就是不经过网络,直接模拟一个响应返回给客户端)
    Qt宏Q_OBJECT展开记录
    rem_taobaofix.js
    yield return
    NET full stack framework
    API 网关
    Redis主从高可用缓存
    数据异构解决方案缓存一致性和跨服务器查询
    NET Core 防止跨站请求
  • 原文地址:https://www.cnblogs.com/wanglu/p/11387044.html
Copyright © 2011-2022 走看看