zoukankan      html  css  js  c++  java
  • 从文件导数据到数据库的性能优化思路(笔记)

    概述

         最近公司一.NET项目需要对其日志Log入数据库统计,写这个脚本导入的任务便落到我身上了。采用了熟练的Java,这个日志也不是很大,一个文件大概几兆,有上万条数据,一天大概有7,8个文件需要这样的导入处理。由于之前写Web没有这么去批处理这么多数据,所以没有太注意性能,第一个版本程序导入速度慢的吓人,一个文件导完可能需要10多分钟,也就是说如果把每天的文件导完可能需要2个多小时的时间,听听就很蛋疼,最终经过优化后,一个文件导入也就几秒,甚至可以更短。目标日志文件的信息都是按行存储,所以程序中按行读取后,然后进行相应的字符串截取入库。下面则为思路分享以及主要代码的分享。
     
    优化思路
         1.程序流程:
           程序先读取本地的文件到内存,然后把内存的数据批量Insert到数据库。
         2.归纳:
         可以看出首先程序需要进行文件IO操作,然后则是数据JDBC操作,所以优化方向大致可以是以下几个:
              a.文件IO优化
              b.JDBC操作优化
              c.使用多线程并行JDBC操作
     文件常见IO简介
         Java的文件读写操作大概有这么几种方式,但是我们应该注意几种文件操作方式的区别,哪些操作方式适合不同的数据文件对象。
         1.(InputStream/OutputStream)    为字节输入/输出流,这种读写方式都是按一定字节量读取数据。
         2. (FileInputStream/FileOutputStream) 此方法继承自上面的(InputStream/OutpustStream),同样按字节流输入/输出,用于读取图像之类的原始字节流
         3.(FileReader/FileWriter) 此方法适用于按字符流的文件操作
         4. (BufferedReader/BufferedWriter) 从字符输入流中读取文本,缓冲各个字符,从而实现字符、数组和行的高效读取。
     
          注:更详细的IO操作说明,请查看具体的JDK文档。
      此处我采用的BufferedReader按行读取,代码片段:
     1     public static List<String> getLogLinesByBuf(String filePath){
     2         
     3         List<String> items = new ArrayList<String>();
     4         File file = new File(filePath);    
     5         BufferedReader reader;
     6         if (file.exists()) {
     7             
     8             try {
     9                 reader = new BufferedReader(new FileReader(file));
    10                 String temp = "";
    11                 while((temp = reader.readLine()) != null) {
    12                     items.add(temp);
    13                 }            
    14                 //close
    15                 reader.close();
    16             } catch (Exception e) {                
    17                 e.printStackTrace();
    18             }
    19         } else {
    20             System.out.println("该路径文件不存在.");
    21         }        
    22         return items;
    23     }
    PreparedStatement和Statement 
            JDBC操作我们经常会用到PreparedStatement和Statement,PreparedStatement相对Statement来讲,PreparedStatement拥有预编译能力,性能更好,2者其它的优缺点比较可以查看相关的资料。另外,平常我们插入数据都是一条,2条,当完成成千上万条数据插入操作的时候,你会看到性能是直线下降的,所以这里会采用sql批处理。
           代码片段:
        
        public static void insertLogInfo(List<String> data) {
            
            String sql = "INSERT INTO log_info(date_time,s_sitename,s_ip,cs_method,cs_uri_stem,cs_uri_query,"
                    + "s_port,cs_username,c_ip,cs_user_agent,sc_status,sc_substatus,sc_win32_status"
                    + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
            Connection conn = DBSource.getConnection();
            int count = 0;
            try {
                conn.setAutoCommit(false);
                PreparedStatement prest = conn.prepareStatement(sql);  
                
                for(String str : data) {
                    String[] arr = str.split(" ");
                    prest.setString(1, arr[0]+" "+arr[1]);
                    prest.setString(2, arr[2]);
                    prest.setString(3, arr[3]);
                    prest.setString(4, arr[4]);
                    prest.setString(5, arr[5]);
                    prest.setString(6, arr[6]);
                    prest.setString(7, arr[7]);
                    prest.setString(8, arr[8]);
                    prest.setString(9, arr[9]);
                    prest.setString(10, arr[10]);
                    prest.setString(11, arr[11]);
                    prest.setString(12, arr[12]);
                    prest.setString(13, arr[13]);
                    //添加到批处理
                    prest.addBatch();    
                }
    
                int [] intarr = prest.executeBatch();
                conn.commit();  
                prest.clearBatch();                      
                prest.close();
                conn.close();
                for (int j = 0 ; j < intarr.length; j++) {
                    if (intarr[j] > 0) {
                        count +=1;
                    }
                }    
            } catch (Exception e) {
                System.out.println(new Date().toLocaleString()+":数据库插入操作失败"+e.getMessage());
            }      
            System.out.println("本次操作成功插入"+count+"行数据");    
        }
    View Code
    多线程并行处理
             例如本来1万条数据是一个线程进行JDBC批量提交,现在启用5个线程并行处理,每个线程2000条数据,甚至你可以根据数据量来分配更多线程来完成同步提交,性能提升会比较明显。
           代码片段:
      Thread类: 
      1 package com.xj.dbsource;
      2 
      3 import java.io.File;
      4 
      5 import java.sql.Connection;
      6 import java.sql.DriverManager;
      7 import java.sql.PreparedStatement;
      8 import java.sql.SQLException;
      9 import java.sql.Statement;
     10 import java.util.Date;
     11 import java.util.List;
     12 
     13 import com.json.utils.JsonFileUtils;
     14 import com.xj.iislog.bean.JDBCInfo;
     15 
     16 
     17 
     18 /**
     19  * 
     20  * @author Ziv
     21  * 数据操作源
     22  */
     23 public class DBSource extends Thread {
     24     
     25     //声明对象
     26     private static Statement statement;
     27     //连接对象
     28     private static Connection conn;
     29     
     30     private List<String> data;
     31         
     32     public DBSource(List<String> data) {
     33         super();
     34         this.data = data;
     35     }
     36 
     37     public void run(){
     38         System.out.println(System.currentTimeMillis());
     39         DBSource.insertLogInfo(data);
     40         System.out.println(System.currentTimeMillis());
     41     }
     42     
     43     /**
     44      * 
     45      * @param sql
     46      * @return int
     47      */
     48     @SuppressWarnings("deprecation")
     49     public int insert(String sql) {
     50         
     51         int result = 0;
     52         try {
     53             conn = getConnection();
     54             statement = conn.createStatement();
     55             result = statement.executeUpdate(sql);        
     56             //关闭连接
     57             conn.close();
     58         } catch (SQLException e) {
     59             System.out.println(new Date().toLocaleString()+":数据库插入操作失败" +e.getMessage());
     60         }
     61         
     62         return result;
     63     }
     64     
     65     /**
     66      * prepared方式入库
     67      * @param arr
     68      * @return
     69      * @throws SQLException
     70      */
     71     @SuppressWarnings("deprecation")
     72     public static void insertLogInfo(List<String> data) {
     73         
     74         String sql = "INSERT INTO log_info(date_time,s_sitename,s_ip,cs_method,cs_uri_stem,cs_uri_query,"
     75                 + "s_port,cs_username,c_ip,cs_user_agent,sc_status,sc_substatus,sc_win32_status"
     76                 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
     77         Connection conn = DBSource.getConnection();
     78         int count = 0;
     79         try {
     80             conn.setAutoCommit(false);
     81             PreparedStatement prest = conn.prepareStatement(sql);  
     82             
     83             for(String str : data) {
     84                 String[] arr = str.split(" ");
     85                 prest.setString(1, arr[0]+" "+arr[1]);
     86                 prest.setString(2, arr[2]);
     87                 prest.setString(3, arr[3]);
     88                 prest.setString(4, arr[4]);
     89                 prest.setString(5, arr[5]);
     90                 prest.setString(6, arr[6]);
     91                 prest.setString(7, arr[7]);
     92                 prest.setString(8, arr[8]);
     93                 prest.setString(9, arr[9]);
     94                 prest.setString(10, arr[10]);
     95                 prest.setString(11, arr[11]);
     96                 prest.setString(12, arr[12]);
     97                 prest.setString(13, arr[13]);
     98                 //添加到批处理
     99                 prest.addBatch();    
    100             }
    101 
    102             int [] intarr = prest.executeBatch();
    103             conn.commit();  
    104             prest.clearBatch();                      
    105             prest.close();
    106             conn.close();
    107             for (int j = 0 ; j < intarr.length; j++) {
    108                 if (intarr[j] > 0) {
    109                     count +=1;
    110                 }
    111             }    
    112         } catch (Exception e) {
    113             System.out.println(new Date().toLocaleString()+":数据库插入操作失败"+e.getMessage());
    114         }      
    115         System.out.println("本次操作成功插入"+count+"行数据");    
    116     }
    117     
    118     /**
    119      * 创建连接池
    120      * @return Connection
    121      */
    122     public static Connection getConnection() {
    123         Connection con = null;
    124         try {
    125             //从配置文件中获取jdbc config
    126             JDBCInfo jdbc = JsonFileUtils.readJsonFile(new File("resource/config.json"), JDBCInfo.class);
    127             if (jdbc != null) {
    128                 //mysql驱动加载
    129                 Class.forName(jdbc.getDriver());
    130                 con = DriverManager.getConnection(jdbc.getUrl(),
    131                         jdbc.getUser(), jdbc.getPassword());                            
    132             }
    133         } catch (Exception e) {
    134             System.out.println("数据库连接失败" +e.getMessage());
    135         }
    136         return con;
    137     }
    138     
    139     
    140     /**
    141      * 获取Sql
    142      * @param arr
    143      * @return
    144      */
    145     public String getSql(String[] arr) {
    146         
    147         StringBuffer sql = new StringBuffer("INSERT INTO log_info (");
    148         sql.append("date_time,");
    149         sql.append("s_sitename,");
    150         sql.append("s_ip,");
    151         sql.append("cs_method,");
    152         sql.append("cs_uri_stem,");
    153         sql.append("cs_uri_query,");
    154         sql.append("s_port,");
    155         sql.append("cs_username,");
    156         sql.append("c_ip,");
    157         sql.append("cs_user_agent,");
    158         sql.append("sc_status,");
    159         sql.append("sc_substatus,");
    160         sql.append("sc_win32_status");
    161         sql.append(") VALUES ('");
    162         sql.append(arr[0]+" "+arr[1]);
    163         sql.append("','");
    164         sql.append(arr[2]);
    165         sql.append("','");
    166         sql.append(arr[3]);
    167         sql.append("','");
    168         sql.append(arr[4]);
    169         sql.append("','");
    170         sql.append(arr[5]);
    171         sql.append("','");
    172         sql.append(arr[6]);
    173         sql.append("','");
    174         sql.append(arr[7]);
    175         sql.append("','");
    176         sql.append(arr[8]);
    177         sql.append("','");
    178         sql.append(arr[9]);
    179         sql.append("','");
    180         sql.append(arr[10]);
    181         sql.append("','");
    182         sql.append(arr[11]);
    183         sql.append("','");
    184         sql.append(arr[12]);
    185         sql.append("','");
    186         sql.append(arr[13]);
    187         sql.append("')");
    188         
    189         return sql.toString();
    190     }
    191 }
    View Code
      调用代码:
     1     /**
     2      * 此方法采用递归操作,直至数据全部入库写入完毕
     3      * 同时调用5个线程进行入库操作
     4      * @param data
     5      * @param start
     6      * @param end
     7      */
     8     public static void threadsHandle(List<String> data, int start, int end) {
     9         
    10         int total = data.size();
    11         int size = (int)data.size()/5;
    12         //数据不越界
    13         if (start < total) {
    14             List<String> temp = null;
    15             if (end < total) {
    16                 temp = data.subList(start, end);                        
    17             } else if (end >= total) {
    18                 temp = data.subList(start, total);
    19             }
    20             //执行数据写入
    21             DBSource thread = new DBSource(temp);
    22             thread.start();
    23 
    24             start = end;
    25             end = start+size;
    26             threadsHandle(data, start, end);
    27         }
    28     }
    最终结果:
         原来的12分钟,变成了6秒左右,效率大了一大截。其他朋友如果有更好的建议,可以跟我交流下0.0。下次再把数据弄的更大些。
  • 相关阅读:
    跟我一起学Go系列:gRPC 全局数据传输和超时处理
    跟我一起学Go系列:Go gRPC 安全认证方式-Token和自定义认证
    c++中的继承关系
    数值型模板参数的应用
    [源码解析] 机器学习参数服务器Paracel (3)------数据处理
    [源码解析] PyTorch 分布式(2) --- 数据加载之DataLoader
    [源码解析] PyTorch 分布式(1) --- 数据加载之DistributedSampler
    [源码解析] 机器学习参数服务器 Paracel (2)--------SSP控制协议实现
    [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构
    [源码解析]机器学习参数服务器ps-lite(4) ----- 应用节点实现
  • 原文地址:https://www.cnblogs.com/zivxiaowei/p/3656730.html
Copyright © 2011-2022 走看看