zoukankan      html  css  js  c++  java
  • 十万级百万级数据量的Excel文件导入并写入数据库

    一.需求分析

      最近接到一个需求,导入十万级,甚至可能百万数据量的记录了车辆黑名单的Excel文件,借此机会分析下编码过程;

      首先将这个需求拆解,发现有三个比较复杂的问题:

      问题一:Excel文件导入后首先要被解析为存放对象的列表,数据量大的情况下可能会导致内存溢出,解析时间过长;

      问题二:插入数据库的时候,数据量大,写入的时间长

      问题三:要对数据库中的现有数据进项判断,不仅仅要做插入动作,还要将数据库的数据与导入的数据对比,判断是否做更新操作

      

      其中:

      问题一和问题三,可以看做同一类,因为主要涉及内存计算导致的性能问题,以及内存占用过大的溢出问题,

      关于这两个问题,现在线上的机器基本上是4核8G的配置集群部署,内存并不是关键,我会在另一篇文章中给出我的方案,

      今天主要针对问题二,写入的数据库的问题给出我的方案,

      

      问题二主要是多次写入数据库的问题,显然,如果有几十万条数据,那么是不可能连续写几十万次的,不然要写到后年马月才能全部入库,

      解决方案:

        这里我主要采用了多线程的写入方式,十万条数据,2000条写一次(可以自己定义),用线程池提交多个线程任务同时写入,提高性能

    二.代码环境

      Springboot2.1.3+POI+PGSQL

      controller层代码

        @PostMapping("/upload")
        public void upload1(MultipartFile file, @Validated UploadReq req) throws Exception {
            //从数据库查询出现有的数据,根据去重的字段分组去构建成一个HashMap,通过containsKey()判断
            //将需要更新的数据放到updateList中
            List<User> updateList=new ArrayList<>();
    
            //已取值的行数
            int rowNum = 0;
            //列号
            int colNum = 0;
            //真正有数据的行数
            int realRowCount = 0;
            //得到工作空间
            Workbook workbook = null;
    
            try {
                workbook = ExcelUtil.getWorkbookByInputStream(file.getInputStream(), file.getOriginalFilename());
            } catch (IOException e) {
                e.printStackTrace();
            }
            //得到工作表
            int numberOfSheets = workbook.getNumberOfSheets();
            for (int i = 0; i < numberOfSheets; i++) {
                Sheet sheet = ExcelUtil.getSheetByWorkbook(workbook, i)
                realRowCount = sheet.getPhysicalNumberOfRows();
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                List<User> list = new ArrayList<>();
                User user = null;
    
                for(Row row:sheet) {
                    if(realRowCount == rowNum) {
                        break;
                    }
                    //空行跳过
                    if(ExcelUtil.isBlankRow(row)) {
                        continue;
                    }
                    if(row.getRowNum() == -1) {
                        continue;
                    }else {
                        //第一行表头跳过
                        if(row.getRowNum() == 0) {
                            continue;
                        }
                    }
                    rowNum ++;
                    colNum = 1;
                    user = new User();
                    ExcelUtil.validCellValue(sheet, row, colNum, "id");
                    user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
                    ExcelUtil.validCellValue(sheet, row, ++ colNum, "name");
                    user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
                    //判断是否是已存在的数据,如果是就更新,不是就新增
                    //updateList.add(user);
                    list.add(user);
                    
                }
                
                //新增的逻辑
                userService.saveBatch(list);
                System.out.println(list);
            }
        }

      service层代码

    @Service
    public class UserServiceImpl implements IUserService {
    
        @Autowired
        private UserMapper userMapper;
    
    
        @Override
        public void saveBatch(List<User> list) throws Exception {
            //一个线程处理200条数据
            int count = 200;
            //数据集合大小
            int listSize = list.size();
            //开启的线程数
            int runSize = (listSize / count) + 1;
            //存放每个线程的执行数据
            List<User> newlist = null;
    
            //创建一个线程池,数量和开启线程的数量一样
            //Executors 的写法
            // ExecutorService executor = Executors.newFixedThreadPool(runSize);
    
            //ThreadPoolExecutor的写法
            ThreadPoolExecutor executor = new ThreadPoolExecutor(runSize, runSize, 1,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            //创建两个个计数器
            CountDownLatch begin = new CountDownLatch(1);
            CountDownLatch end = new CountDownLatch(runSize);
            //循环创建线程
            for (int i = 0; i < runSize; i++) {
                //计算每个线程执行的数据
                if ((i + 1) == runSize) {
                    int startIndex = (i * count);
                    int endIndex = list.size();
                    newlist = list.subList(startIndex, endIndex);
                } else {
                    int startIndex = (i * count);
                    int endIndex = (i + 1) * count;
                    newlist = list.subList(startIndex, endIndex);
                }
                //线程类
                ImportThread mythead = new ImportThread(newlist, begin, end,userMapper);
                //这里执行线程的方式是调用线程池里的executor.execute(mythead)方法。
                executor.execute(mythead);
            }
            begin.countDown();
            end.await();
            //执行完关闭线程池
            executor.shutdown();
        }

      线程类

    public class ImportThread implements Runnable {
    
    
        public ImportThread() {
        }
    
        UserMapper userMapper;
        private List<User> list;
        private CountDownLatch begin;
        private CountDownLatch end;
    
        /**
         * 方法名: ImportThread
         * 方法描述: 创建个构造函数初始化 list,和其他用到的参数
         * @throws
         */
        public ImportThread(List<User> list, CountDownLatch begin, CountDownLatch end,UserMapper userMapper) {
            this.list = list;
            this.begin = begin;
            this.end = end;
            this.userMapper=userMapper;
        }
    
        @Override
        public void run() {
            try {
                //执行完让线程直接进入等待
                userMapper.saveBatch(list);
                begin.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //这里要主要了,当一个线程执行完 了计数要减一不然这个线程会被一直挂起
                //这个方法就是直接把计数器减一的
                end.countDown();
            }
        }
    
    }

      

      

      

  • 相关阅读:
    ActiveMQ
    Solr学习笔记(4) —— SolrCloud的概述和搭建
    Solr学习笔记(3) —— SolrJ管理索引库&集群
    JAVA 平台
    JMS(Java平台上的专业技术规范)
    zookeeper 分布式管理
    java 类型转换
    聚集索引 非聚类索引 区别 二
    聚集索引 非聚类索引 区别
    阶乘
  • 原文地址:https://www.cnblogs.com/july-sunny/p/11761482.html
Copyright © 2011-2022 走看看