zoukankan      html  css  js  c++  java
  • 使用copy函数完成数据库迁移

    最近在该一个迁移工具的迁移方式,从ora8迁移到postgresql使用原来的插入迁移速度太慢了,老板说让使用缓存迁移,即使用postgresql的copy函数,因此去pg官网查阅了相关资料,我们需要迁移的数据量大约有3000万条,需要时间在半个小时之内,这个迁移第一步先把相关的表结构迁移过去,然后开始导入数据,下面是我缓存迁移写的一段代码,核心都在里面:

      1     private boolean toPostgresql(int tableIndex) throws SQLException, IOException
      2         {
      3             int success_flag = 0; // 设置成功标志
      4 
      5             boolean isHasError = false;
      6 
      7             TableConfig tableConfig = (TableConfig) tableConfigList.get(tableIndex);
      8             ITable destTable = tableConfig.getDestTable(); // 可以通过这个获得目的表的表名字
      9             ITable srITable = tableConfig.getDestTable();
     10             // srITable.
     11             String dest_tablename = destTable.getName().toString();
     12             String dest_schema = destTable.getSchemaName();
     13 
     14             Statement stmt = null;
     15             ResultSet rs = null;
     16             
     17             Setfetchsize s1=new Setfetchsize();
     18             try
     19                 {
     20                     s1.getxml_setfetchsize();
     21                 } catch (ParserConfigurationException e)
     22                 {
     23                     // TODO Auto-generated catch block
     24                     e.printStackTrace();
     25                 } catch (SAXException e)
     26                 {
     27                     // TODO Auto-generated catch block
     28                     e.printStackTrace();
     29                 }
     30             int stmt_setfetchsize = s1.getSetfetchsize_num();// 设置setfetchsize
     31             // 取值大小
     32             stmt = srcConn.createStatement();
     33             stmt.setFetchSize(stmt_setfetchsize);
     34             rs = getSrcResultSet(tableConfig, stmt);
     35             ResultSetMetaData rsmd = rs.getMetaData();// 为了获得字段用的
     36             // System.out.println("缓存setfetchsize设置为:"+stmt_setfetchsize);
     37             int columnCount = rsmd.getColumnCount(); // 获得字段的个数
     38             StringBuilder sbuild = new StringBuilder();
     39             //System.out.println("stmt_setfetchsize实际取值为:"+stmt.getFetchSize());
     40             /**
     41              * 把结果集的数据拼接成字符串语句,保存到sbulid中 大小受缓存大小影响
     42              */
     43             KBCopyOutputStream kb_output = new KBCopyOutputStream((BaseConnection) destConn, "COPY " + dest_schema + "." + dest_tablename + "  FROM STDIN");
     44 
     45             while (rs.next())
     46                 {
     47 
     48                     row_totalnum += 1; // 记录总数据的总行数
     49                     for (int i = 1; i <= columnCount; i++)
     50                         {
     51                             String val = rs.getString(i);
     52                             int coltype = rsmd.getColumnType(i);
     53                             // 对字符字段的转义字串进行处理,使其转义效果失效
     54                             if (coltype == Types.CHAR || coltype == Types.VARCHAR || coltype == Types.NCHAR || coltype == Types.NVARCHAR || coltype == Types.LONGVARCHAR)
     55                                 {
     56                                     int valen = val.length();
     57                                     sbuild.ensureCapacity(valen + 4);
     58                                     for (int j = 0; j < valen; j++)
     59                                         {
     60                                             char ch = val.charAt(j);
     61                                             switch (ch)
     62                                             {
     63                                                 case '	' :
     64                                                     sbuild.append("\t");
     65                                                     break;
     66                                                 case '
    ' :
     67                                                     sbuild.append("\n");
     68                                                     break;
     69                                                 case '
    ' :
     70                                                     sbuild.append("\r");
     71                                                     break;
     72                                                 case '\' :
     73                                                     sbuild.append("\\");
     74                                                     break;
     75                                                 default :
     76                                                     sbuild.append(ch);
     77                                             }
     78                                         }
     79                                 } else
     80                                     sbuild.append(val);
     81 
     82                             if (i < columnCount)
     83                                 sbuild.append('	');
     84                         }
     85                     sbuild.append('
    ');
     86 
     87                     String s = sbuild.toString();
     88                     byte[] bytes = s.getBytes("UTF-8");
     89                     kb_output.write(bytes);
     90                     if (row_totalnum % 10 == 0)
     91                         {
     92                             successRowNum = 0;
     93                             addToSuccessNum(row_totalnum);
     94                         }
     95                     // System.out.print(s);
     96                     sbuild.setLength(0);
     97                 }
     98             kb_output.close();
     99             success_flag = 1;
    100             destConn.commit();
    101 
    102             if (success_flag == 1)
    103                 {
    104                     writeFinishResult(srITable.getFullName(), destTable.getFullName(), row_totalnum, row_totalnum, 0);
    105                     successRowNum = 0;
    106                     addToSuccessNum(row_totalnum);
    107                 } else
    108                     {
    109                         writeFinishResult(srITable.getFullName(), destTable.getFullName(), row_totalnum, 0, row_totalnum);
    110                         addToErrorNum(row_totalnum); // copyin执行失败返回错误数
    111                     }
    112             writeSteps(" ");
    113             return !isHasError;
    114         }

    我们的setfetchsize可以自己设置一下,这个目的是减少来回访问oracle拿取数据的开销,具体用法可以查看相关资料,主要是访问的数据量大的时候用的。上面的代码有个小缺陷,就是我把处理过的数据都放到一个stringbuilder里了,当数据量很大超过内存时候,估计就跑不动了,你可以对stringbuilder里的数据进行大小判断一下,当满足的时候commit一下啊就行,这个数据迁移是在一个事务中进行的,所以最后有个commit提交!

  • 相关阅读:
    Java实现 LeetCode 343 整数拆分(动态规划入门经典)
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 338 比特位计数
    H264(NAL简介与I帧判断)
    分享一段H264视频和AAC音频的RTP封包代码
  • 原文地址:https://www.cnblogs.com/codeblock/p/4680280.html
Copyright © 2011-2022 走看看