zoukankan      html  css  js  c++  java
  • java多线程向数据库中加载数据

    读取本地文件,每行为一条记录,文件大小550M,200万条数据。先将文件读取的内存中,再开启6个线程连接postgresql不同coordinator端口导入数据。代码如下:

    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class InsertThread implements Runnable {
    
    
    public static void main(String[] args) {
    // String file = "f://weibo.200w"; 
    // String tb = "tb2";
    // String ip = "102";
    String file = args[0];
    String tb = args[1];
    String ip = args[2];
    String[] port = { "2341", "2342", "2343", "2344", "2345", "2346" };
    List<String> list = null;
    try {
    // 获得源数据
    list = getContent(file);
    System.out.println(list.size());
    } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    
    
    InsertThread myThread1 = new InsertThread();
    myThread1.setPramater(tb, ip, port[0], list);
    Thread thread1 = new Thread(myThread1);
    
    
    InsertThread myThread2 = new InsertThread();
    myThread2.setPramater(tb, ip, port[1], list);
    Thread thread2 = new Thread(myThread2);
    
    
    InsertThread myThread3 = new InsertThread();
    myThread3.setPramater(tb, ip, port[2], list);
    Thread thread3 = new Thread(myThread3);
    
    
    InsertThread myThread4 = new InsertThread();
    myThread4.setPramater(tb, ip, port[3], list);
    Thread thread4 = new Thread(myThread4);
    
    
    InsertThread myThread5 = new InsertThread();
    myThread5.setPramater(tb, ip, port[4], list);
    Thread thread5 = new Thread(myThread5);
    
    
    InsertThread myThread6 = new InsertThread();
    myThread6.setPramater(tb, ip, port[5], list);
    Thread thread6 = new Thread(myThread6);
    
    
    thread1.start();
    thread2.start();
    thread3.start();
    thread4.start();
    thread5.start();
    thread6.start();
    }
    
    
    private String tb;
    private String ip;
    private String port;
    private List<String> list;
    
    
    public void setPramater(String tb, String ip, String port, List<String> list) {
    this.tb = tb;
    this.ip = ip;
    this.port = port;
    this.list = list;
    }
    
    
    public void run() {
    PreparedStatement ps = null;
    String sql = null;
    Connection conn = null;
    
    
    try {
    Class.forName("org.postgresql.Driver");
    String url = "jdbc:postgresql://192.168.8." + ip + ":" + port
    + "/postgres";
    try {
    conn = DriverManager.getConnection(url, "postgres", "postgres");
    } catch (SQLException e) {
    e.printStackTrace();
    }
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }
    int count = 0;
    try {
    conn.setAutoCommit(false);
    sql = "insert into "
    + tb
    + " values(?,?,?,?,?,?,?::timestamptz,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    ps = conn.prepareStatement(sql);
    } catch (SQLException e) {
    e.printStackTrace();
    }
    Long beginTime = System.currentTimeMillis();
    Long begin = System.currentTimeMillis();
    
    
    for (int i = 0; i < list.size(); i++) {
    String[] con = list.get(i).split(",", -1);
    if (con.length != 38) {
    continue;
    }
    count++;
    try {
    for (int j = 0; j < con.length; j++) {
    if (con[j] == null) {
    ps.setString(j + 1, "NULL");
    } else {
    ps.setString(j + 1, con[j].trim());
    }
    }
    ps.addBatch();
    if (count > 0 && count % 10000 == 0) {// 可以设置不同的大小;如50,100,500,1000等等
    ps.executeBatch();
    conn.commit();
    ps.clearBatch();
    Long midTime = System.currentTimeMillis();
    System.out.println("-----------------" + count);
    System.out.println("导入1万条数据性能" + (10000 * 1000)
    / (midTime - begin));
    begin = midTime;
    }
    } catch (SQLException e) {
    e.printStackTrace();
    continue;
    }
    }
    try {
    ps.executeBatch();
    conn.commit();
    ps.clearBatch();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    
    
    long sum = 1000 * count;
    long endTime = System.currentTimeMillis();
    
    
    System.out.println("pst+batch:" + count + "条");
    System.out.println("pst+batch:" + (endTime - beginTime) / 1000 + "秒");
    System.out.println("pst+batch:" + sum / (endTime - beginTime) + "条/秒");
    
    
    if (ps != null) {
    try {
    ps.close();
    } catch (SQLException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    if (conn != null) {
    try {
    conn.close();
    } catch (SQLException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }
    
    
    public static List<String> getContent(String file) throws Exception {
    BufferedReader br = new BufferedReader(new InputStreamReader(
    new FileInputStream(file), "UTF-8"));
    String line = br.readLine();
    List<String> list = new ArrayList<String>();
    while (line != null) {
    list.add(line);
    line = br.readLine();
    }
    br.close();
    return list;
    }
    }
    

      



  • 相关阅读:
    Erlang 督程 启动和结束子进程
    cocos2d-x 3.0 内存管理机制
    c语言基本数据类型
    4星|《剑桥中国经济史:古代到19世纪》:经济学视角看中国古代史
    孟晚舟三种结局;共享单车大败局;失业潮不会来:4星|《财经》2018年第30期
    2018左其盛差评榜,罕见的差书榜
    2018左其盛好书榜,没见过更好的榜单
    罗振宇时间的朋友2018跨年演讲中最重要的35句话
    中国土地制度与房价走势相关9本书
    2星|水木然《世界在变软》:肤浅的朋友圈鸡汤文
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5027193.html
Copyright © 2011-2022 走看看