zoukankan      html  css  js  c++  java
  • List集合分批多线程处理,同时控制最大并发

    业务中,要实现数据日终同步,采用将同步文件中的数据封装成List集合分批处理加多线程的方式,根据数据量动态设置线程数,同时控制最大并发数量(业务中有IO操作,避免过大并发导致堵塞),实现效率提高

    //最大线程数控制
    private static int MAX_THREADS= 5;
    //跑批分页大小
    private static int EXPIRED_PAGE_SIZE = 30;
    
    private void dataHandler(List<SyncFileDto> list) {
        //处理数据数量
        int listSize = list.size();
        //线程数
        int runSize;
        if (listSize % EXPIRED_PAGE_SIZE == 0) {
            runSize = (listSize / EXPIRED_PAGE_SIZE);
        } else {
            runSize = (listSize / EXPIRED_PAGE_SIZE) + 1;
        }
        ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(runSize);
        CountDownLatch countDownLatch = new CountDownLatch(runSize);
        //最大并发线程数控制
        final Semaphore semaphore = new Semaphore(MAX_THREADS);
        List handleList = null;
        for (int i = 0; i < runSize; i++) {
            if ((i + 1) == runSize) {
                int startIndex = i * EXPIRED_PAGE_SIZE;
                int endIndex = list.size();
                handleList = list.subList(startIndex, endIndex);
            } else {
                int startIndex = i * EXPIRED_PAGE_SIZE;
                int endIndex = (i + 1) * EXPIRED_PAGE_SIZE;
                handleList = list.subList(startIndex, endIndex);
            }
            SyncTask task = new SyncTask(handleList, countDownLatch, semaphore);
            executor.execute(task);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
        executor.shutdown();
        }
    }
    
    class SyncTask implements Runnable {
        private List<SyncFileDto> list;
        private CountDownLatch countDownLatch;
        private Semaphore semaphore;
    
        public SyncSyncTask(List<SyncFileDto> list, CountDownLatch countDownLatch, Semaphore semaphore) {
            this.list = list;
            this.countDownLatch = countDownLatch;
            this.semaphore = semaphore;
        }
    
        @Override
        public void run() {
            if (!CollectionUtils.isEmpty(list)) {
                try {
                    semaphore.acquire();
                    list.stream().forEach(fileDto -> {
                        //业务处理
                    });
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
    
            }
            //线程任务完成
            countDownLatch.countDown();
        }
    }

     上面是通过手动数据分片,CountDownLatch计数器闭锁和Semaphore限流的方式进行的并发控制,后期改造时发现逻辑较复杂,因此改变线程池的类型,创建可控制的线程池ThreadPoolExecutor(该线程池也是ScheduledThreadPoolExecutor的父类),自定义其属性实现跑批线程池线程数量及并发量可控。

    ExecutorService fixedThreadPool = new ThreadPoolExecutor(INIT_NTHREADS, INIT_NTHREADS, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    该线程池的使用我在后面博客【地址】 中有介绍,可移步阅读。

  • 相关阅读:
    在测试自定义starter时,若出现无法找到helloservice的Bean的解决方法
    springboot项目启动后tomcat服务器自动关闭 解决方法
    spring-ioc注解-理解2 零配置文件
    spring-ioc的注解 理解-1
    spring-ioc心得
    springboot的自动配置
    容器关系
    编写程序要做到结构、层次清晰明朗
    maven依赖的jar下载(在指定的仓库中)
    思考:开发的环境问题是一个大问题,也是首先要解决的问题,然后才能顺畅进入开发工作?
  • 原文地址:https://www.cnblogs.com/zjfjava/p/11065698.html
Copyright © 2011-2022 走看看