zoukankan      html  css  js  c++  java
  • 多线程分批处理集合(可扩展为分批从数据库中读取数据)的测试一例子【我】

    任务类:

    import java.util.List;
    import java.util.Map;
    
    
    public class MyTask implements Runnable {
        
        //当前待处理数据集合
        private List dataList;
        //其他参数Map
        private Map paramMap;
        
        public MyTask() {
            super();
        }
    
        public MyTask(List dataList, Map paramMap) {
            super();
            this.dataList = dataList;
            this.paramMap = paramMap;
        }
    
        public List getDataList() {
            return dataList;
        }
    
        public void setDataList(List dataList) {
            this.dataList = dataList;
        }
    
        public Map getParamMap() {
            return paramMap;
        }
    
        public void setParamMap(Map paramMap) {
            this.paramMap = paramMap;
        }
    
    
        @Override
        public void run() {
            try {
                
                long threadStartTime = System.currentTimeMillis();
    //            System.out.println("--T--线程: {"+Thread.currentThread().getName()+"} -- 开始执行,当前批次数据: {"+dataList.size()+"} 条,线程数:{"+paramMap.get("threadNum")+"},批次数:{"+paramMap.get("batchNum")+"},当前模值: {"+paramMap.get("mod")+"},文档待处理总文件数:{"+paramMap.get("dataNum")+"},文档ID:{}");
                System.out.println("--T--线程: {"+Thread.currentThread().getName()+"} -- 开始执行,当前批次数据: {"+dataList.size()+"} 条,当前模值: "+paramMap.get("mod"));
                for (int y = 0; y < dataList.size(); y++) {
                
                    Object object = dataList.get(y);
                    
                    try {
                        long st = System.currentTimeMillis();
    //                    System.out.println("--T--线程: {"+Thread.currentThread().getName()+"正在处理的数据是:"+object);
                        Thread.sleep(10);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("--T--线程: {"+Thread.currentThread().getName()+"} -- 结束执行,当前批次数据: {"+dataList.size()+"} 条,当前模值: {"+paramMap.get("mod")+"},当前线程总耗时:"+(System.currentTimeMillis() - threadStartTime));
            } catch (Exception e) {
                e.printStackTrace();
            }
        
        }
    
    }

    测试类:

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class T2 {
    
        @SuppressWarnings("unchecked")
        public static void main(String[] args) {
            
            ArrayList<Object> dataList = new ArrayList<>();
            for (int i = 0; i < 193570; i++) {
                dataList.add(i);
            }
            
            long t1 = System.currentTimeMillis();
            // 创建线程服务
            ExecutorService exec = null;
            
            //数据总数
            int dataNum = dataList.size();
            
            // 线程数默认为1,数据小于等于100时使用
            int threadNum = 1;
            // 分隔数据的批次数
            int batchNum = 1;
            // 系统能承受最大线程数 batch.maxThreadNum 取自配置文件
    //        int maxThreadNum = Integer.parseInt(PropertieUtil.getConfig("maxThreadNum"));
            int maxThreadNum = 100;
            // 默认一次处理100条左右
            int onceNum = 100;
    
            if (dataNum <= 100) {
                batchNum = 1;
                exec = Executors.newCachedThreadPool();
            } else if (100 < dataNum && dataNum <= 10000) {
    
                // 批次数不会大于100
                batchNum = dataNum / onceNum;
    
                if (batchNum > maxThreadNum) {
                    // 设置固定线程数100
                    threadNum = maxThreadNum;
                } else {
                    // 线程数等于批次数
                    threadNum = batchNum;
                }
                // 开启缓存线程池
                exec = Executors.newCachedThreadPool();
            } else if (dataNum > 10000) {
                // 计划每批次500条左右
                onceNum = 500;
                // 批次数计算
                batchNum = dataNum / onceNum; // bathNum 范围在20到400之间
                if (batchNum > maxThreadNum) {
                    // 设置固定线程数100
                    threadNum = maxThreadNum;
                } else {
                    // 线程数等于批次数
                    threadNum = batchNum;
                }
                // 开启固定线程池
                exec = Executors.newFixedThreadPool(threadNum);
            }
    
            System.out.println("--B--预计线程数为:{"+threadNum+"},预计批次数:{"+batchNum+"},总待处理数量为:{"+dataNum+"}");
            // 定义多线程相关
            // final Semaphore semaphore = new Semaphore(10);
            // ExecutorService exec = Executors.newCachedThreadPool();
    
            // 处理的文件总数(查询出的)
            int sumHandler = 0;
            
            // 根据批次数启动线程
            for (int i = 0; i < batchNum; i++) {
                // 根据线程数和当前模值查出一批数据
                ArrayList onceList = new ArrayList();
                //根据对分批数量的模值切割数据
                for (int j = 0; j < dataNum; j++) {
                    //用数据的id(这里是用数据集合的角标模拟)对 批次数量 取模,进行切分
                    //【实际项目中这步是用sql语句从数据库中按照相同的条件查询数据】
                    if (j%batchNum==i) {
                        onceList.add(dataList.get(j));
                    }
                }
    //            System.out.println("-----主线程中的i:"+i);
                //每个线程用一个参数Map【注意:这里必须在循环内部new参数map,如果在循环外,会出现问题】
                HashMap paramMap = new HashMap();
                paramMap.put("dataNum", dataNum);
                paramMap.put("batchNum", batchNum);
                paramMap.put("threadNum", threadNum);
                //当前模值
                paramMap.put("mod", i);
                // 开启线程
                Runnable task = new MyTask(onceList, paramMap);
                exec.submit(task);
                //计数
                sumHandler += onceList.size();
            }
            exec.shutdown();
            // exec.awaitTermination(1, TimeUnit.HOURS);
            while (true) {
                if (exec.isTerminated()) {
                    System.out.println("--B--所有子线程都结束了,共计校验记录:{"+sumHandler+"}");
                    break;
                }
            }
            System.out.println("--B--总耗时:"+(System.currentTimeMillis()-t1));
        }
        
    }
  • 相关阅读:
    POJ 2991 Crane(线段树)
    HDU 1754 I Hate It(线段树)
    HDU 1754 I Hate It(线段树)
    HDU 1166 敌兵布阵 (线段树模版题)
    HDU 1166 敌兵布阵 (线段树模版题)
    Tree Recovery
    Tree Recovery
    情人节的电灯泡(二维树状数组)
    情人节的电灯泡(二维树状数组)
    【LeetCode】Validate Binary Search Tree 二叉查找树的推断
  • 原文地址:https://www.cnblogs.com/libin6505/p/10675029.html
Copyright © 2011-2022 走看看