一.需求分析
最近接到一个需求,导入十万级,甚至可能百万数据量的记录了车辆黑名单的Excel文件,借此机会分析下编码过程;
首先将这个需求拆解,发现有三个比较复杂的问题:
问题一:Excel文件导入后首先要被解析为存放对象的列表,数据量大的情况下可能会导致内存溢出,解析时间过长;
问题二:插入数据库的时候,数据量大,写入的时间长
问题三:要对数据库中的现有数据进项判断,不仅仅要做插入动作,还要将数据库的数据与导入的数据对比,判断是否做更新操作
其中:
问题一和问题三,可以看做同一类,因为主要涉及内存计算导致的性能问题,以及内存占用过大的溢出问题,
关于这两个问题,现在线上的机器基本上是4核8G的配置集群部署,内存并不是关键,我会在另一篇文章中给出我的方案,
今天主要针对问题二,写入的数据库的问题给出我的方案,
问题二主要是多次写入数据库的问题,显然,如果有几十万条数据,那么是不可能连续写几十万次的,不然要写到后年马月才能全部入库,
解决方案:
这里我主要采用了多线程的写入方式,十万条数据,2000条写一次(可以自己定义),用线程池提交多个线程任务同时写入,提高性能
二.代码环境
Springboot2.1.3+POI+PGSQL
controller层代码
@PostMapping("/upload") public void upload1(MultipartFile file, @Validated UploadReq req) throws Exception { //从数据库查询出现有的数据,根据去重的字段分组去构建成一个HashMap,通过containsKey()判断 //将需要更新的数据放到updateList中 List<User> updateList=new ArrayList<>(); //已取值的行数 int rowNum = 0; //列号 int colNum = 0; //真正有数据的行数 int realRowCount = 0; //得到工作空间 Workbook workbook = null; try { workbook = ExcelUtil.getWorkbookByInputStream(file.getInputStream(), file.getOriginalFilename()); } catch (IOException e) { e.printStackTrace(); } //得到工作表 int numberOfSheets = workbook.getNumberOfSheets(); for (int i = 0; i < numberOfSheets; i++) { Sheet sheet = ExcelUtil.getSheetByWorkbook(workbook, i) realRowCount = sheet.getPhysicalNumberOfRows(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); List<User> list = new ArrayList<>(); User user = null; for(Row row:sheet) { if(realRowCount == rowNum) { break; } //空行跳过 if(ExcelUtil.isBlankRow(row)) { continue; } if(row.getRowNum() == -1) { continue; }else { //第一行表头跳过 if(row.getRowNum() == 0) { continue; } } rowNum ++; colNum = 1; user = new User(); ExcelUtil.validCellValue(sheet, row, colNum, "id"); user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1))); ExcelUtil.validCellValue(sheet, row, ++ colNum, "name"); user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1))); //判断是否是已存在的数据,如果是就更新,不是就新增 //updateList.add(user); list.add(user); } //新增的逻辑 userService.saveBatch(list); System.out.println(list); } }
service层代码
@Service public class UserServiceImpl implements IUserService { @Autowired private UserMapper userMapper; @Override public void saveBatch(List<User> list) throws Exception { //一个线程处理200条数据 int count = 200; //数据集合大小 int listSize = list.size(); //开启的线程数 int runSize = (listSize / count) + 1; //存放每个线程的执行数据 List<User> newlist = null; //创建一个线程池,数量和开启线程的数量一样 //Executors 的写法 // ExecutorService executor = Executors.newFixedThreadPool(runSize); //ThreadPoolExecutor的写法 ThreadPoolExecutor executor = new ThreadPoolExecutor(runSize, runSize, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy()); //创建两个个计数器 CountDownLatch begin = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(runSize); //循环创建线程 for (int i = 0; i < runSize; i++) { //计算每个线程执行的数据 if ((i + 1) == runSize) { int startIndex = (i * count); int endIndex = list.size(); newlist = list.subList(startIndex, endIndex); } else { int startIndex = (i * count); int endIndex = (i + 1) * count; newlist = list.subList(startIndex, endIndex); } //线程类 ImportThread mythead = new ImportThread(newlist, begin, end,userMapper); //这里执行线程的方式是调用线程池里的executor.execute(mythead)方法。 executor.execute(mythead); } begin.countDown(); end.await(); //执行完关闭线程池 executor.shutdown(); }
线程类
public class ImportThread implements Runnable { public ImportThread() { } UserMapper userMapper; private List<User> list; private CountDownLatch begin; private CountDownLatch end; /** * 方法名: ImportThread * 方法描述: 创建个构造函数初始化 list,和其他用到的参数 * @throws */ public ImportThread(List<User> list, CountDownLatch begin, CountDownLatch end,UserMapper userMapper) { this.list = list; this.begin = begin; this.end = end; this.userMapper=userMapper; } @Override public void run() { try { //执行完让线程直接进入等待 userMapper.saveBatch(list); begin.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //这里要主要了,当一个线程执行完 了计数要减一不然这个线程会被一直挂起 //这个方法就是直接把计数器减一的 end.countDown(); } } }