zoukankan      html  css  js  c++  java
  • 大数据导入--cvs文本等格式数据

    最近公司需要从别的系统同步数据,不走接口,只提供上传csv文本数据。数据量每次百万级的

    通过缓存流读取数据,这边没有性能问题,但是通过spring 批量操作保存到数据库存在严重性能问题,按1W提交一次全部执行完居然要5个小时。

    见代码 

    BufferedRandomAccessFile 类
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.Arrays;
    
    /**
     * 版权所有:
     * 项目名称:
     * 创建者: diaoby
     * 创建日期:
     * 文件说明: 
     */
    public class BufferedRandomAccessFile extends RandomAccessFile {
        static final int LogBuffSz_ = 16; // 64K buffer
        public static final int BuffSz_ = (1 << LogBuffSz_);
        static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
    
        private String path_;
    
        /*
         * This implementation is based on the buffer implementation in Modula-3's
         * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
         */
        private boolean dirty_; // true iff unflushed bytes exist
        private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
        private long curr_; // current position in file
        private long lo_, hi_; // bounds on characters in "buff"
        private byte[] buff_; // local buffer
        private long maxHi_; // this.lo + this.buff.length
        private boolean hitEOF_; // buffer contains last file block?
        private long diskPos_; // disk position
    
        /*
         * To describe the above fields, we introduce the following abstractions for
         * the file "f":
         *
         * len(f) the length of the file curr(f) the current position in the file
         * c(f) the abstract contents of the file disk(f) the contents of f's
         * backing disk file closed(f) true iff the file is closed
         *
         * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
         * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
         * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
         * operation has the effect of making "disk(f)" identical to "c(f)".
         *
         * A file is said to be *valid* if the following conditions hold:
         *
         * V1. The "closed" and "curr" fields are correct:
         *
         * f.closed == closed(f) f.curr == curr(f)
         *
         * V2. The current position is either contained in the buffer, or just past
         * the buffer:
         *
         * f.lo <= f.curr <= f.hi
         *
         * V3. Any (possibly) unflushed characters are stored in "f.buff":
         *
         * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
         *
         * V4. For all characters not covered by V3, c(f) and disk(f) agree:
         *
         * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
         * disk(f)[i])
         *
         * V5. "f.dirty" is true iff the buffer contains bytes that should be
         * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
         *
         * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
         *
         * V6. this.maxHi == this.lo + this.buff.length
         *
         * Note that "f.buff" can be "null" in a valid file, since the range of
         * characters in V3 is empty when "f.lo == f.curr".
         *
         * A file is said to be *ready* if the buffer contains the current position,
         * i.e., when:
         *
         * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
         *
         * When a file is ready, reading or writing a single byte can be performed
         * by reading or writing the in-memory buffer without performing a disk
         * operation.
         */
    
        /**
         * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
         * in mode <code>mode</code>, which should be "r" for reading only, or
         * "rw" for reading and writing.
         */
        public BufferedRandomAccessFile(File file, String mode) throws IOException {
            this(file, mode, 0);
        }
    
        public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
            super(file, mode);
            path_ = file.getAbsolutePath();
            this.init(size);
        }
    
        /**
         * Open a new <code>BufferedRandomAccessFile</code> on the file named
         * <code>name</code> in mode <code>mode</code>, which should be "r" for
         * reading only, or "rw" for reading and writing.
         */
        public BufferedRandomAccessFile(String name, String mode) throws IOException {
            this(name, mode, 0);
        }
    
        public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
            super(name, mode);
            path_ = name;
            this.init(size);
        }
    
        private void init(int size) {
            this.dirty_ = false;
            this.lo_ = this.curr_ = this.hi_ = 0;
            this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
            this.maxHi_ = (long) BuffSz_;
            this.hitEOF_ = false;
            this.diskPos_ = 0L;
        }
    
        public String getPath() {
            return path_;
        }
    
        public void sync() throws IOException {
            if (syncNeeded_) {
                flush();
                getChannel().force(true);
                syncNeeded_ = false;
            }
        }
    
    //      public boolean isEOF() throws IOException
    //      {
    //          assert getFilePointer() <= length();
    //          return getFilePointer() == length();
    //      }
    
        public void close() throws IOException {
            this.flush();
            this.buff_ = null;
            super.close();
        }
    
        /**
         * Flush any bytes in the file's buffer that have not yet been written to
         * disk. If the file was created read-only, this method is a no-op.
         */
        public void flush() throws IOException {
            this.flushBuffer();
        }
    
        /* Flush any dirty bytes in the buffer to disk. */
        private void flushBuffer() throws IOException {
            if (this.dirty_) {
                if (this.diskPos_ != this.lo_)
                    super.seek(this.lo_);
                int len = (int) (this.curr_ - this.lo_);
                super.write(this.buff_, 0, len);
                this.diskPos_ = this.curr_;
                this.dirty_ = false;
            }
        }
    
        /*
         * Read at most "this.buff.length" bytes into "this.buff", returning the
         * number of bytes read. If the return result is less than
         * "this.buff.length", then EOF was read.
         */
        private int fillBuffer() throws IOException {
            int cnt = 0;
            int rem = this.buff_.length;
            while (rem > 0) {
                int n = super.read(this.buff_, cnt, rem);
                if (n < 0)
                    break;
                cnt += n;
                rem -= n;
            }
            if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
                // make sure buffer that wasn't read is initialized with -1
                Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
            }
            this.diskPos_ += cnt;
            return cnt;
        }
    
        /*
         * This method positions <code>this.curr</code> at position <code>pos</code>.
         * If <code>pos</code> does not fall in the current buffer, it flushes the
         * current buffer and loads the correct one.<p>
         *
         * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
         * is at or past the end-of-file, which can only happen if the file was
         * opened in read-only mode.
         */
        public void seek(long pos) throws IOException {
            if (pos >= this.hi_ || pos < this.lo_) {
                // seeking outside of current buffer -- flush and read
                this.flushBuffer();
                this.lo_ = pos & BuffMask_; // start at BuffSz boundary
                this.maxHi_ = this.lo_ + (long) this.buff_.length;
                if (this.diskPos_ != this.lo_) {
                    super.seek(this.lo_);
                    this.diskPos_ = this.lo_;
                }
                int n = this.fillBuffer();
                this.hi_ = this.lo_ + (long) n;
            } else {
                // seeking inside current buffer -- no read required
                if (pos < this.curr_) {
                    // if seeking backwards, we must flush to maintain V4
                    this.flushBuffer();
                }
            }
            this.curr_ = pos;
        }
    
        public long getFilePointer() {
            return this.curr_;
        }
    
        public long length() throws IOException {
            // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
            return Math.max(this.curr_, super.length());
        }
    
        public int read() throws IOException {
            if (this.curr_ >= this.hi_) {
                // test for EOF
                // if (this.hi < this.maxHi) return -1;
                if (this.hitEOF_)
                    return -1;
    
                // slow path -- read another buffer
                this.seek(this.curr_);
                if (this.curr_ == this.hi_)
                    return -1;
            }
            byte res = this.buff_[(int) (this.curr_ - this.lo_)];
            this.curr_++;
            return ((int) res) & 0xFF; // convert byte -> int
        }
    
        public int read(byte[] b) throws IOException {
            return this.read(b, 0, b.length);
        }
    
        public int read(byte[] b, int off, int len) throws IOException {
            if (this.curr_ >= this.hi_) {
                // test for EOF
                // if (this.hi < this.maxHi) return -1;
                if (this.hitEOF_)
                    return -1;
    
                // slow path -- read another buffer
                this.seek(this.curr_);
                if (this.curr_ == this.hi_)
                    return -1;
            }
            len = Math.min(len, (int) (this.hi_ - this.curr_));
            int buffOff = (int) (this.curr_ - this.lo_);
            System.arraycopy(this.buff_, buffOff, b, off, len);
            this.curr_ += len;
            return len;
        }
    
        public void write(int b) throws IOException {
            if (this.curr_ >= this.hi_) {
                if (this.hitEOF_ && this.hi_ < this.maxHi_) {
                    // at EOF -- bump "hi"
                    this.hi_++;
                } else {
                    // slow path -- write current buffer; read next one
                    this.seek(this.curr_);
                    if (this.curr_ == this.hi_) {
                        // appending to EOF -- bump "hi"
                        this.hi_++;
                    }
                }
            }
            this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
            this.curr_++;
            this.dirty_ = true;
            syncNeeded_ = true;
        }
    
        public void write(byte[] b) throws IOException {
            this.write(b, 0, b.length);
        }
    
        public void write(byte[] b, int off, int len) throws IOException {
            while (len > 0) {
                int n = this.writeAtMost(b, off, len);
                off += n;
                len -= n;
                this.dirty_ = true;
                syncNeeded_ = true;
            }
        }
    
        /*
         * Write at most "len" bytes to "b" starting at position "off", and return
         * the number of bytes written.
         */
        private int writeAtMost(byte[] b, int off, int len) throws IOException {
            if (this.curr_ >= this.hi_) {
                if (this.hitEOF_ && this.hi_ < this.maxHi_) {
                    // at EOF -- bump "hi"
                    this.hi_ = this.maxHi_;
                } else {
                    // slow path -- write current buffer; read next one
                    this.seek(this.curr_);
                    if (this.curr_ == this.hi_) {
                        // appending to EOF -- bump "hi"
                        this.hi_ = this.maxHi_;
                    }
                }
            }
            len = Math.min(len, (int) (this.hi_ - this.curr_));
            int buffOff = (int) (this.curr_ - this.lo_);
            System.arraycopy(b, off, this.buff_, buffOff, len);
            this.curr_ += len;
            return len;
        }
    }
    FileUtil 类,提供操作工具类
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import org.apache.commons.io.IOUtils;
    import org.apache.commons.lang3.StringUtils;
    
    import java.io.*;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 版权所有:
     * 项目名称: 
     * 创建者: diaoby
     * 创建日期: 2019/7/913:43
     * 文件说明: 
     */
    public class FileUtil {
        /**
         * 通过BufferedRandomAccessFile读取文件,推荐
         *
         * @param file     源文件
         * @param encoding 文件编码
         * @param pos      偏移量
         * @param num      读取量
         * @return pins文件内容,pos当前偏移量
         */
        public static Map<String, Object> BufferedRandomAccessFileReadLine(File file, String encoding, long pos, int num) {
            Map<String, Object> res = Maps.newHashMap();
            List<String> pins = Lists.newArrayList();
            res.put("pins", pins);
            BufferedRandomAccessFile reader = null;
            try {
                reader = new BufferedRandomAccessFile(file, "r");
                reader.seek(pos);
                for (int i = 0; i < num; i++) {
                    String pin = reader.readLine();
                    if (StringUtils.isBlank(pin)) {
                        break;
                    }
                    pins.add(new String(pin.getBytes("8859_1"), encoding));
                }
                res.put("pos", reader.getFilePointer());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                IOUtils.closeQuietly(reader);
            }
            return res;
        }
    }
    RandomAccessFileTest 单元测试类
    package file.service;
    
    import com.huaxin.gxgc.csv.housingflow.HousingFlowDao;
    import com.huaxin.gxgc.file.service.FileUtil;
    import org.apache.commons.collections.CollectionUtils;
    import org.apache.commons.collections.MapUtils;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import java.io.File;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    /**
     * &#x7248;&#x6743;&#x6240;&#x6709;&#xff1a;&#x534e;&#x4fe1;&#x8f6f;&#x4ef6;
     * &#x9879;&#x76ee;&#x540d;&#x79f0;: gx-pms
     * &#x521b;&#x5efa;&#x8005;: diaoby
     * &#x521b;&#x5efa;&#x65e5;&#x671f;: 2019/7/913:46
     * &#x6587;&#x4ef6;&#x8bf4;&#x660e;: &#x6d4b;&#x8bd5;&#x7c7b;
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath:applicationContext-test.xml"})
    public class RandomAccessFileTest {
    
        private static final String ENCODING = "GBK";
        private static final int NUM = 10000;
        private static File csvFile = new File("F://网管LTE小区流量(24-26号).csv");
        @Autowired
        private HousingFlowDao housingFlowDao;
    
        /**
         * 测试RandomAccessFile读取文件
         */
        @Test
        public void testBufferedRandomAccessRead() {
            long start = System.currentTimeMillis();
            System.out.println(String.valueOf(csvFile.exists()));
            long pos = 0L;
            while (true) {
                Map<String, Object> res = FileUtil.BufferedRandomAccessFileReadLine(csvFile, ENCODING, pos, NUM);
                // 如果返回结果为空结束循环
                if (MapUtils.isEmpty(res)) {
                    break;
                }
                List<String> pins = (List<String>) res.get("pins");
                if (CollectionUtils.isNotEmpty(pins)) {
                    if (pins.size() < NUM) {
                        break;
                    }
                    List<String[]> collect = pins.parallelStream().map(v -> v.split(",")).filter(v -> v.length == 20).collect(Collectors.toList());
    //                housingFlowDao.batchDelHousingFlow(collect);
    //                housingFlowDao.batchSaveHousingFlow(collect);
                } else {
                    break;
                }
                pos = (Long) res.get("pos");
            }
            System.out.println((System.currentTimeMillis() - start) / 1000);
        }
    
    
    }
    HousingFlowDao 类
    package com.huaxin.gxgc.csv.housingflow;
    
    import com.huaxin.acws.common.dao.BaseDao;
    import com.huaxin.acws.common.util.SpringUtil;
    import org.springframework.jdbc.core.BatchPreparedStatementSetter;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Repository;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * 版权所有:
     * 项目名称: 
     * 创建者: diaoby
     * 创建日期: 2019/7/914:44
     * 文件说明: 小区流量
     */
    @Repository
    @Transactional
    public class HousingFlowDao extends BaseDao {
    
        private JdbcTemplate jdbcTemplate = (JdbcTemplate) SpringUtil.getBean("jdbcTemplate");
        /**
         * 小区流量表
         * @return
         */
        @Override
        protected String getTableName() {
            return "BO_HOUSING_FLOW";
        }
    
        /**
         *
         * @param list
         */
        public void batchSaveHousingFlow(final List<String[]> list){
            final SimpleDateFormat sdf =new SimpleDateFormat("yyyy/MM/dd");
            String insertSql =
                    "INSERT INTO BO_HOUSING_FLOW(" +
                            "CREATE_DATE," +
                            "LTE_TDD," +
                            "CITY," +
                            "ENBID," +
                            "HOUSING_STATE," +
                            "E_NODEB," +
                            "NULL_UP_BYTES," +
                            "NULL_DOWN_BYTES," +
                            "COVER_TYPE," +
                            "CGI," +
                            "STATE," +
                            "COVER_SCENE," +
                            "LONGITUDE," +
                            "DIMENSION," +
                            "U_NII," +
                            "AREA_ATTRIBUTES," +
                            "AVG_AVG," +
                            "AVG_MAX," +
                            "RCP_AVG," +
                            "RCP_MAX )"+
                            " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
                            "          ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    
            jdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i)
                        throws SQLException {
                    String[] values = list.get(i);
                    ps.setString(1, values[0] + "");
                    if(values[0] .equals("")){
                        ps.setNull(1, java.sql.Types.TIMESTAMP);
                    }else{
                        try {
                            ps.setTimestamp(1, new java.sql.Timestamp(sdf.parse(values[0]).getTime()));
                        } catch (ParseException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    ps.setString(2, values[1] + "");
                    ps.setString(3, values[2] + "");
                    ps.setString(4, values[3] + "");
                    ps.setString(5, values[4] + "");
                    ps.setString(6, values[5] + "");
                    if ("".equals(values[6])) {
                        ps.setNull(7, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(7, Double.valueOf(values[6]));
                    }
                    if ("".equals(values[7])) {
                        ps.setNull(8, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(8, Double.valueOf(values[7]));
                    }
                    ps.setString(9, values[8] + "");
                    ps.setString(10, values[9] + "");
                    ps.setString(11, values[10] + "");
                    ps.setString(12, values[11] + "");
                    if ("".equals(values[12])) {
                        ps.setNull(13, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(13, Double.valueOf(values[12]));
                    }
                    if ("".equals(values[13])) {
                        ps.setNull(14, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(14, Double.valueOf(values[13]));
                    }
                    ps.setString(15, values[14] + "");
                    ps.setString(16, values[15] + "");
                    try{
                        if ("".equals(values[16])) {
                            ps.setNull(17, java.sql.Types.DOUBLE);
                        } else {
                            ps.setDouble(17, Double.valueOf(values[16]));
                        }
                    }catch (Exception e){
                        System.out.println(Arrays.toString(values));
                    }
                    if ("".equals(values[17])) {
                        ps.setNull(18, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(18, Double.valueOf(values[17]));
                    }
                    if ("".equals(values[18])) {
                        ps.setNull(19, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(19, Double.valueOf(values[18]));
                    }
                    if ("".equals(values[19])) {
                        ps.setNull(20, java.sql.Types.DOUBLE);
                    } else {
                        ps.setDouble(20, Double.valueOf(values[19]));
                    }
                }
    
                @Override
                public int getBatchSize() {
                    return list.size();
                }
            });
        }
        /**
         * 批量删除
         * @param list
         */
        public void batchDelHousingFlow(final List<String[]> list){
            String deleteSql = "DELETE FROM BO_HOUSING_FLOW T WHERE LTE_TDD = ?";
            jdbcTemplate.batchUpdate(deleteSql, new BatchPreparedStatementSetter() {
                /**
                 *
                 * @param ps
                 * @param i
                 * @throws SQLException
                 */
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    String[] values = list.get(i);
                    ps.setString(1, values[1]);
                }
    
                /**
                 *
                 * @return
                 */
                @Override
                public int getBatchSize() {
                    return list.size();
                }
            });
        }
    
    
    }
    applicationContext-test.xml 文件就不贴了,就配置了数据源 housingFlowDao jdbcTemplate bean
    csv 85w数据居然执行了52秒,不插入数据库 从csv读取完数据只需要9秒
    
    
  • 相关阅读:
    Linux磁盘空间被未知资源耗尽
    磁盘的分区、格式化、挂载(转)
    sp_MSforeachtable和sp_MSforeachdb
    分布式缓存系统 Memcached 【转载】
    在性能计数的时候使用StopWatch类型
    数据库设计阶段中为何要考虑“反规范化”
    再谈谈数据库镜像之客户端重定向
    当SQL Server排序时遇上了NULL值
    ArrayList的动态扩展
    SSMS 2008的智能感知仅仅针对SQL Server 2008的数据库有效
  • 原文地址:https://www.cnblogs.com/diaobiyong/p/11161737.html
Copyright © 2011-2022 走看看