zoukankan      html  css  js  c++  java
  • 多线程并行_countDown

    /**
     * 首次启动加载数据至缓存
     */
    public class ApplicationStartTask {
    
        private static Logger logger = LoggerFactory.getLogger(ApplicationStartTask.class);
        private AtomicInteger loadNum = new AtomicInteger(0);
    
        @Autowired
        private FaceProducer cacheSynchronizationProducer;
        @Autowired
        private FaceConsumer cacheSynchronizationConsumer;
    
        @Autowired
        private FaceInfoService faceInfoService;
    
        /**
         * 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,
         * <br>等到需要执行的任务数大于线程池基本大小时就不再创建
         */
        private static final int POOL_CORE_SIZE = Runtime.getRuntime().availableProcessors();
    
        /**
         * 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
         */
        private static final int POOL_MAX_SIZE = Runtime.getRuntime().availableProcessors();
    
        /**
         * 线程池的工作线程空闲后,保持存活的时间 <br>
         * 单位:分
         */
        private static final int POOL_KEEP_ALIVE_TIME = 2;
    
        @Autowired
        @Qualifier("sqliteManagerImpl")
        private SqlieteManager sqlieteManager;
    
        @Autowired
        private FaceCompareManager faceCompareManager;
    
        @Value("${local.sqlite.sqlname}")
        private String sqlName;
    
        @Bean
        public ApplicationStartTask loadCache() {
            logger.info("[启动任务]首次启动载入数据至缓存...");
            checkSqLiteFile();
            List<String> belongIds = sqlieteManager.getAllBelongIds();
            if (ArrayUtil.isNotEmpty(belongIds)) {
    
                loadFromSqlite(belongIds);
    
                initMqServer();
    
                return new ApplicationStartTask();
            }
            loadFromDB();
    
            initMqServer();
            return new ApplicationStartTask();
    
        }
    
        /**
         * 开启MQ服务
         */
        private void initMqServer() {
    
            logger.info("开始开启MQ服务...");
            cacheSynchronizationProducer.producerStart();
            cacheSynchronizationConsumer.getMessage();
        }
    
    
        /**
         * 直接从MYSQL载入数据
         */
        private void loadFromDB() {
            logger.info("当前CPU内核线程数:{}", POOL_CORE_SIZE);
    
            long beginTimsamp = System.currentTimeMillis();
    
            List<String> allBelongId = faceCompareManager.getAllBelongId();
            int size = 0;
            for (String belongId : allBelongId) {
                FaceInfoSyncDataQuery query = new FaceInfoSyncDataQuery(belongId);
                query.setPageSize(1000);
                query.setDeleteStatus(0);
                Result<Integer> result = faceInfoService.getTotalPage(query);
                if (!result.isSuccess() || result.getModel() == 0) {
                    continue;
                }
    
                int totalPage = result.getModel();
    
                size += totalPage;
    
            }
            logger.info("总任务数:{}", size);
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(POOL_CORE_SIZE, POOL_MAX_SIZE, POOL_KEEP_ALIVE_TIME,
                    TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(size));
            CountDownLatch latch = new CountDownLatch(size);
    
            for (String belongId : allBelongId) {
    
                FaceInfoSyncDataQuery query = new FaceInfoSyncDataQuery();
                query.setBelongId(belongId);
                query.setDeleteStatus(0);
                query.setPageSize(1000);
                Result<FaceInfoSyncDataVO> result = faceInfoService.getFaces(query);
                if (!result.isSuccess() || result.getModel() == null || result.getModel().getTotalPages() == 0) {
                    continue;
                }
                FaceInfoSyncDataVO faceInfoSyncDataVO = result.getModel();
    
                int totalPage = faceInfoSyncDataVO.getTotalPages();
                //第一页
    
                poolExecutor.execute(new CacheLoadTask1(1, belongId, faceInfoSyncDataVO.getFaces(), latch));
                if (totalPage == 1) {
                    continue;
                }
                for (int i = 2; i <= totalPage; i++) {
                    query = new FaceInfoSyncDataQuery();
                    query.setBelongId(belongId);
                    query.setPageSize(1000);
                    query.setPageNo(i);
                    query.setDeleteStatus(0);
                    result = faceInfoService.getFaces(query);
                    if (!result.isSuccess() || result.getModel() == null || ArrayUtil.isEmpty(result.getModel().getFaces())) {
                        continue;
                    }
                    faceInfoSyncDataVO = result.getModel();
    
                    poolExecutor.execute(new CacheLoadTask1(i, belongId, faceInfoSyncDataVO.getFaces(), latch));
    
    
                }
            }
            try {
    
                latch.await();// 同步阻塞,直到所有线程工作完成
                // long endTime = System.currentTimeMillis(); // 获取结束时间
    
                poolExecutor.shutdown();// 不允许提交新任务,等待当前任务及队列中的任务全部执行完毕后退出
                // // 支持等待以前提交的任务停止执行
                // // 所有任务关闭请求或线程中断或超时,阻塞取消
                // poolExecutor.awaitTermination(20, TimeUnit.MINUTES);
                // poolExecutor.shutdownNow();//通过Thread.interrupt试图停止所有正在执行的线程,并不再处理还在队列中等待的任务...
            } catch (Exception e) {
                logger.error("[启动任务]线程池异常打印:{}", e);
                poolExecutor.shutdownNow();
    
            }
    //        logger.info("[启动任务]从DB中载入数据至缓存完毕!共载入{}条人脸数据", loadNum.get());
    
            logger.info("[启动任务]完毕!全文索引{}条数据,共耗时:{}s", loadNum.get(), (System.currentTimeMillis() - beginTimsamp) / 1000);
    
    
        }
    
        class CacheLoadTask1 implements Runnable {
    
    
            private CountDownLatch latch;
            List<FaceInfoDO> faceInfoDOs = null;
            int putCacheNum = 0;
            String belongId = null;
            int pageNo = 0;
    
            public CacheLoadTask1(int pageNo, String belongId, List<FaceInfoDO> faceInfoDOs, CountDownLatch latch) {
                this.latch = latch;
                this.faceInfoDOs = faceInfoDOs;
                this.belongId = belongId;
                this.pageNo = pageNo;
                // this.index = index;
            }
    
            @Override
            public void run() {
                Long beginTime = System.currentTimeMillis();
                try {
                    for (FaceInfoDO faceInfoDO : faceInfoDOs) {
                        boolean isPutCache = FaceInfoCacheManage.put(faceInfoDO);
                        if (isPutCache) {
                            putCacheNum++;
                            loadNum.incrementAndGet();
    
                        }
                    }
                    logger.info("[启动任务]线程{}处理完belongId-{}第{}页数据,共载入{}条人脸数据,耗时:{}ms", Thread.currentThread().getName(),
                            belongId, pageNo, putCacheNum, System.currentTimeMillis() - beginTime);
    
                } catch (Exception e) {
                    logger.error("[启动任务]载入sqlite任务[CacheLoadTask]出错:{}", e);
                } finally {
                    // TODO: handle finally clause
                    latch.countDown();
                }
            }
    
        }
    
        /**
         * 从sqlite数据库中加载数据
         */
        private void loadFromSqlite(List<String> belongIds) {
    
            logger.info("当前CPU内核线程数:{}", POOL_CORE_SIZE);
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(POOL_CORE_SIZE, POOL_MAX_SIZE, POOL_KEEP_ALIVE_TIME,
                    TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(belongIds.size()));
            CountDownLatch latch = new CountDownLatch(belongIds.size());
            for (String belongId : belongIds) {
                List<FaceInfoDO> faceInfoDOs = sqlieteManager.selectAllInfoByBelongId(sqlName, belongId);
    
                poolExecutor.execute(new CacheLoadTask(belongId, faceInfoDOs, latch));
    
            }
    
            try {
    
                latch.await();// 同步阻塞,直到所有线程工作完成
                // long endTime = System.currentTimeMillis(); // 获取结束时间
                logger.info("[启动任务]从sqlite中载入数据至缓存完毕!共载入{}条人脸数据", loadNum.get());
    
                poolExecutor.shutdown();// 不允许提交新任务,等待当前任务及队列中的任务全部执行完毕后退出
                // // 支持等待以前提交的任务停止执行
                // // 所有任务关闭请求或线程中断或超时,阻塞取消
                // poolExecutor.awaitTermination(20, TimeUnit.MINUTES);
                // poolExecutor.shutdownNow();//通过Thread.interrupt试图停止所有正在执行的线程,并不再处理还在队列中等待的任务...
            } catch (Exception e) {
                logger.error("[启动任务]线程池异常打印:{}", e);
                poolExecutor.shutdownNow();
    
            }
    
        }
    
    
        private void checkSqLiteFile() {
            File sqliteFile = new File(sqlName + ".sqlite3");
            if (!sqliteFile.exists()) {
                logger.info("[启动任务]sqlite数据库未创建,开始创建...");
                // 第一次启动创建本地表
                try {
                    sqlieteManager.createTable(sqlName);
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
    
            }
        }
    
    }


  • 相关阅读:
    《我曾》火了:人这辈子,最怕突然听懂这首歌
    SpringMVC的运行流程
    Directive 自定义指令
    Vue 过滤器
    MVC 和 MVVM的区别
    vue指令
    async
    Generator
    单词搜索
    Promise
  • 原文地址:https://www.cnblogs.com/zhangfengshi/p/10070644.html
Copyright © 2011-2022 走看看