最近公司需要从别的系统同步数据,不走接口,只提供上传csv文本数据。数据量每次百万级的
通过缓存流读取数据,这边没有性能问题,但是通过spring 批量操作保存到数据库存在严重性能问题,按1W提交一次全部执行完居然要5个小时。
见代码
BufferedRandomAccessFile 类
import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Arrays; /** * 版权所有: * 项目名称: * 创建者: diaoby * 创建日期: * 文件说明: */ public class BufferedRandomAccessFile extends RandomAccessFile { static final int LogBuffSz_ = 16; // 64K buffer public static final int BuffSz_ = (1 << LogBuffSz_); static final long BuffMask_ = ~(((long) BuffSz_) - 1L); private String path_; /* * This implementation is based on the buffer implementation in Modula-3's * "Rd", "Wr", "RdClass", and "WrClass" interfaces. */ private boolean dirty_; // true iff unflushed bytes exist private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately private long curr_; // current position in file private long lo_, hi_; // bounds on characters in "buff" private byte[] buff_; // local buffer private long maxHi_; // this.lo + this.buff.length private boolean hitEOF_; // buffer contains last file block? private long diskPos_; // disk position /* * To describe the above fields, we introduce the following abstractions for * the file "f": * * len(f) the length of the file curr(f) the current position in the file * c(f) the abstract contents of the file disk(f) the contents of f's * backing disk file closed(f) true iff the file is closed * * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush * operation has the effect of making "disk(f)" identical to "c(f)". * * A file is said to be *valid* if the following conditions hold: * * V1. The "closed" and "curr" fields are correct: * * f.closed == closed(f) f.curr == curr(f) * * V2. The current position is either contained in the buffer, or just past * the buffer: * * f.lo <= f.curr <= f.hi * * V3. Any (possibly) unflushed characters are stored in "f.buff": * * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo]) * * V4. For all characters not covered by V3, c(f) and disk(f) agree: * * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] == * disk(f)[i]) * * V5. "f.dirty" is true iff the buffer contains bytes that should be * flushed to the file; by V3 and V4, only part of the buffer can be dirty. * * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo]) * * V6. this.maxHi == this.lo + this.buff.length * * Note that "f.buff" can be "null" in a valid file, since the range of * characters in V3 is empty when "f.lo == f.curr". * * A file is said to be *ready* if the buffer contains the current position, * i.e., when: * * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi * * When a file is ready, reading or writing a single byte can be performed * by reading or writing the in-memory buffer without performing a disk * operation. */ /** * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code> * in mode <code>mode</code>, which should be "r" for reading only, or * "rw" for reading and writing. */ public BufferedRandomAccessFile(File file, String mode) throws IOException { this(file, mode, 0); } public BufferedRandomAccessFile(File file, String mode, int size) throws IOException { super(file, mode); path_ = file.getAbsolutePath(); this.init(size); } /** * Open a new <code>BufferedRandomAccessFile</code> on the file named * <code>name</code> in mode <code>mode</code>, which should be "r" for * reading only, or "rw" for reading and writing. */ public BufferedRandomAccessFile(String name, String mode) throws IOException { this(name, mode, 0); } public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException { super(name, mode); path_ = name; this.init(size); } private void init(int size) { this.dirty_ = false; this.lo_ = this.curr_ = this.hi_ = 0; this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_]; this.maxHi_ = (long) BuffSz_; this.hitEOF_ = false; this.diskPos_ = 0L; } public String getPath() { return path_; } public void sync() throws IOException { if (syncNeeded_) { flush(); getChannel().force(true); syncNeeded_ = false; } } // public boolean isEOF() throws IOException // { // assert getFilePointer() <= length(); // return getFilePointer() == length(); // } public void close() throws IOException { this.flush(); this.buff_ = null; super.close(); } /** * Flush any bytes in the file's buffer that have not yet been written to * disk. If the file was created read-only, this method is a no-op. */ public void flush() throws IOException { this.flushBuffer(); } /* Flush any dirty bytes in the buffer to disk. */ private void flushBuffer() throws IOException { if (this.dirty_) { if (this.diskPos_ != this.lo_) super.seek(this.lo_); int len = (int) (this.curr_ - this.lo_); super.write(this.buff_, 0, len); this.diskPos_ = this.curr_; this.dirty_ = false; } } /* * Read at most "this.buff.length" bytes into "this.buff", returning the * number of bytes read. If the return result is less than * "this.buff.length", then EOF was read. */ private int fillBuffer() throws IOException { int cnt = 0; int rem = this.buff_.length; while (rem > 0) { int n = super.read(this.buff_, cnt, rem); if (n < 0) break; cnt += n; rem -= n; } if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) { // make sure buffer that wasn't read is initialized with -1 Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff); } this.diskPos_ += cnt; return cnt; } /* * This method positions <code>this.curr</code> at position <code>pos</code>. * If <code>pos</code> does not fall in the current buffer, it flushes the * current buffer and loads the correct one.<p> * * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code> * is at or past the end-of-file, which can only happen if the file was * opened in read-only mode. */ public void seek(long pos) throws IOException { if (pos >= this.hi_ || pos < this.lo_) { // seeking outside of current buffer -- flush and read this.flushBuffer(); this.lo_ = pos & BuffMask_; // start at BuffSz boundary this.maxHi_ = this.lo_ + (long) this.buff_.length; if (this.diskPos_ != this.lo_) { super.seek(this.lo_); this.diskPos_ = this.lo_; } int n = this.fillBuffer(); this.hi_ = this.lo_ + (long) n; } else { // seeking inside current buffer -- no read required if (pos < this.curr_) { // if seeking backwards, we must flush to maintain V4 this.flushBuffer(); } } this.curr_ = pos; } public long getFilePointer() { return this.curr_; } public long length() throws IOException { // max accounts for the case where we have written past the old file length, but not yet flushed our buffer return Math.max(this.curr_, super.length()); } public int read() throws IOException { if (this.curr_ >= this.hi_) { // test for EOF // if (this.hi < this.maxHi) return -1; if (this.hitEOF_) return -1; // slow path -- read another buffer this.seek(this.curr_); if (this.curr_ == this.hi_) return -1; } byte res = this.buff_[(int) (this.curr_ - this.lo_)]; this.curr_++; return ((int) res) & 0xFF; // convert byte -> int } public int read(byte[] b) throws IOException { return this.read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws IOException { if (this.curr_ >= this.hi_) { // test for EOF // if (this.hi < this.maxHi) return -1; if (this.hitEOF_) return -1; // slow path -- read another buffer this.seek(this.curr_); if (this.curr_ == this.hi_) return -1; } len = Math.min(len, (int) (this.hi_ - this.curr_)); int buffOff = (int) (this.curr_ - this.lo_); System.arraycopy(this.buff_, buffOff, b, off, len); this.curr_ += len; return len; } public void write(int b) throws IOException { if (this.curr_ >= this.hi_) { if (this.hitEOF_ && this.hi_ < this.maxHi_) { // at EOF -- bump "hi" this.hi_++; } else { // slow path -- write current buffer; read next one this.seek(this.curr_); if (this.curr_ == this.hi_) { // appending to EOF -- bump "hi" this.hi_++; } } } this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b; this.curr_++; this.dirty_ = true; syncNeeded_ = true; } public void write(byte[] b) throws IOException { this.write(b, 0, b.length); } public void write(byte[] b, int off, int len) throws IOException { while (len > 0) { int n = this.writeAtMost(b, off, len); off += n; len -= n; this.dirty_ = true; syncNeeded_ = true; } } /* * Write at most "len" bytes to "b" starting at position "off", and return * the number of bytes written. */ private int writeAtMost(byte[] b, int off, int len) throws IOException { if (this.curr_ >= this.hi_) { if (this.hitEOF_ && this.hi_ < this.maxHi_) { // at EOF -- bump "hi" this.hi_ = this.maxHi_; } else { // slow path -- write current buffer; read next one this.seek(this.curr_); if (this.curr_ == this.hi_) { // appending to EOF -- bump "hi" this.hi_ = this.maxHi_; } } } len = Math.min(len, (int) (this.hi_ - this.curr_)); int buffOff = (int) (this.curr_ - this.lo_); System.arraycopy(b, off, this.buff_, buffOff, len); this.curr_ += len; return len; } }
FileUtil 类,提供操作工具类
import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import java.io.*; import java.util.List; import java.util.Map; /** * 版权所有: * 项目名称: * 创建者: diaoby * 创建日期: 2019/7/913:43 * 文件说明: */ public class FileUtil { /** * 通过BufferedRandomAccessFile读取文件,推荐 * * @param file 源文件 * @param encoding 文件编码 * @param pos 偏移量 * @param num 读取量 * @return pins文件内容,pos当前偏移量 */ public static Map<String, Object> BufferedRandomAccessFileReadLine(File file, String encoding, long pos, int num) { Map<String, Object> res = Maps.newHashMap(); List<String> pins = Lists.newArrayList(); res.put("pins", pins); BufferedRandomAccessFile reader = null; try { reader = new BufferedRandomAccessFile(file, "r"); reader.seek(pos); for (int i = 0; i < num; i++) { String pin = reader.readLine(); if (StringUtils.isBlank(pin)) { break; } pins.add(new String(pin.getBytes("8859_1"), encoding)); } res.put("pos", reader.getFilePointer()); } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } return res; } }
RandomAccessFileTest 单元测试类
package file.service; import com.huaxin.gxgc.csv.housingflow.HousingFlowDao; import com.huaxin.gxgc.file.service.FileUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.io.File; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * 版权所有:华信软件 * 项目名称: gx-pms * 创建者: diaoby * 创建日期: 2019/7/913:46 * 文件说明: 测试类 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext-test.xml"}) public class RandomAccessFileTest { private static final String ENCODING = "GBK"; private static final int NUM = 10000; private static File csvFile = new File("F://网管LTE小区流量(24-26号).csv"); @Autowired private HousingFlowDao housingFlowDao; /** * 测试RandomAccessFile读取文件 */ @Test public void testBufferedRandomAccessRead() { long start = System.currentTimeMillis(); System.out.println(String.valueOf(csvFile.exists())); long pos = 0L; while (true) { Map<String, Object> res = FileUtil.BufferedRandomAccessFileReadLine(csvFile, ENCODING, pos, NUM); // 如果返回结果为空结束循环 if (MapUtils.isEmpty(res)) { break; } List<String> pins = (List<String>) res.get("pins"); if (CollectionUtils.isNotEmpty(pins)) { if (pins.size() < NUM) { break; } List<String[]> collect = pins.parallelStream().map(v -> v.split(",")).filter(v -> v.length == 20).collect(Collectors.toList()); // housingFlowDao.batchDelHousingFlow(collect); // housingFlowDao.batchSaveHousingFlow(collect); } else { break; } pos = (Long) res.get("pos"); } System.out.println((System.currentTimeMillis() - start) / 1000); } }
HousingFlowDao 类
package com.huaxin.gxgc.csv.housingflow; import com.huaxin.acws.common.dao.BaseDao; import com.huaxin.acws.common.util.SpringUtil; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; /** * 版权所有: * 项目名称: * 创建者: diaoby * 创建日期: 2019/7/914:44 * 文件说明: 小区流量 */ @Repository @Transactional public class HousingFlowDao extends BaseDao { private JdbcTemplate jdbcTemplate = (JdbcTemplate) SpringUtil.getBean("jdbcTemplate"); /** * 小区流量表 * @return */ @Override protected String getTableName() { return "BO_HOUSING_FLOW"; } /** * * @param list */ public void batchSaveHousingFlow(final List<String[]> list){ final SimpleDateFormat sdf =new SimpleDateFormat("yyyy/MM/dd"); String insertSql = "INSERT INTO BO_HOUSING_FLOW(" + "CREATE_DATE," + "LTE_TDD," + "CITY," + "ENBID," + "HOUSING_STATE," + "E_NODEB," + "NULL_UP_BYTES," + "NULL_DOWN_BYTES," + "COVER_TYPE," + "CGI," + "STATE," + "COVER_SCENE," + "LONGITUDE," + "DIMENSION," + "U_NII," + "AREA_ATTRIBUTES," + "AVG_AVG," + "AVG_MAX," + "RCP_AVG," + "RCP_MAX )"+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?," + " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { String[] values = list.get(i); ps.setString(1, values[0] + ""); if(values[0] .equals("")){ ps.setNull(1, java.sql.Types.TIMESTAMP); }else{ try { ps.setTimestamp(1, new java.sql.Timestamp(sdf.parse(values[0]).getTime())); } catch (ParseException e) { throw new RuntimeException(e); } } ps.setString(2, values[1] + ""); ps.setString(3, values[2] + ""); ps.setString(4, values[3] + ""); ps.setString(5, values[4] + ""); ps.setString(6, values[5] + ""); if ("".equals(values[6])) { ps.setNull(7, java.sql.Types.DOUBLE); } else { ps.setDouble(7, Double.valueOf(values[6])); } if ("".equals(values[7])) { ps.setNull(8, java.sql.Types.DOUBLE); } else { ps.setDouble(8, Double.valueOf(values[7])); } ps.setString(9, values[8] + ""); ps.setString(10, values[9] + ""); ps.setString(11, values[10] + ""); ps.setString(12, values[11] + ""); if ("".equals(values[12])) { ps.setNull(13, java.sql.Types.DOUBLE); } else { ps.setDouble(13, Double.valueOf(values[12])); } if ("".equals(values[13])) { ps.setNull(14, java.sql.Types.DOUBLE); } else { ps.setDouble(14, Double.valueOf(values[13])); } ps.setString(15, values[14] + ""); ps.setString(16, values[15] + ""); try{ if ("".equals(values[16])) { ps.setNull(17, java.sql.Types.DOUBLE); } else { ps.setDouble(17, Double.valueOf(values[16])); } }catch (Exception e){ System.out.println(Arrays.toString(values)); } if ("".equals(values[17])) { ps.setNull(18, java.sql.Types.DOUBLE); } else { ps.setDouble(18, Double.valueOf(values[17])); } if ("".equals(values[18])) { ps.setNull(19, java.sql.Types.DOUBLE); } else { ps.setDouble(19, Double.valueOf(values[18])); } if ("".equals(values[19])) { ps.setNull(20, java.sql.Types.DOUBLE); } else { ps.setDouble(20, Double.valueOf(values[19])); } } @Override public int getBatchSize() { return list.size(); } }); } /** * 批量删除 * @param list */ public void batchDelHousingFlow(final List<String[]> list){ String deleteSql = "DELETE FROM BO_HOUSING_FLOW T WHERE LTE_TDD = ?"; jdbcTemplate.batchUpdate(deleteSql, new BatchPreparedStatementSetter() { /** * * @param ps * @param i * @throws SQLException */ @Override public void setValues(PreparedStatement ps, int i) throws SQLException { String[] values = list.get(i); ps.setString(1, values[1]); } /** * * @return */ @Override public int getBatchSize() { return list.size(); } }); } }
applicationContext-test.xml 文件就不贴了,就配置了数据源 housingFlowDao jdbcTemplate bean
csv 85w数据居然执行了52秒,不插入数据库 从csv读取完数据只需要9秒