zoukankan      html  css  js  c++  java
  • java多线程向数据库写入数据

    任务: 从sqlserver中将一个表A(约16W条数据)导到mysql中对应的一个表B中。

    思路:分段获取A表中的数据后,用多个线程同时向B表中写入。

    关键代码

    //将数据库中的数据条数分段
    	public void division(){
    		//获取要导入的总的数据条数
    		String sql3="SELECT  count(*)  FROM [CMD].[dbo].[mycopy1]";
    		try {
    			pss=cons.prepareStatement(sql3);
    			rss=pss.executeQuery();
    			
    			while(rss.next()){
    			System.out.println("总记录条数:"+rss.getInt(1));
    			sum=rss.getInt(1);
    			}
    			//每30000条记录作为一个分割点
    			if(sum>=30000){
    				n=sum/30000;
    				residue=sum%30000;
    			}else{
    				residue=sum;
    			}
    			
    			System.out.println(n+"  "+residue);
    			
    		} catch (SQLException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    	}
    	

    线程类

    public MyThread(int start,int end) {
    		this.end=end; 
        	this.start=start;
    		System.out.println("处理掉余数");
    		  try {
    				
    	        	System.out.println("--------"+Thread.currentThread().getName()+"------------");
    				Class.forName(SQLSERVERDRIVER);
    				System.out.println("加载sqlserver驱动...");
    				cons = DriverManager.getConnection(CONTENTS,UNS,UPS);
    				stas = cons.createStatement();
    				System.out.println("连接SQLServer数据库成功!!");
    				
    				System.out.println("加载mysql驱动.....");
    				Class.forName(MYSQLDRIVER);
    				con = DriverManager.getConnection(CONTENT,UN,UP);
    				sta = con.createStatement();
    				// 关闭事务自动提交
    				con.setAutoCommit(false);
    				System.out.println("连接mysql数据库成功!!");
    				
    			} catch (Exception e) {
    				e.printStackTrace();	
    			}
    		// TODO Auto-generated constructor stub
    	}
    
    
    
    
    
    	public ArrayList<Member> getAll(){
    		Member member;
    		String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +
    				" from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end;
    		try {
    			System.out.println("正在获取数据...");
    			allmembers=new ArrayList();
    			rss=stas.executeQuery(sql1);
    			while(rss.next()){
    				member=new Member();
    				member.setAddress1(rss.getString("address1"));
    				member.setBnpoints(rss.getString("bnpoints"));
    				member.setDbno(rss.getString("dbno"));
    				member.setExpiry(rss.getString("expiry"));
    				member.setHispoints(rss.getString("hispoints"));
    				member.setKypoints(rss.getString("kypoints"));
    				member.setLevels(rss.getString("levels"));
    				member.setNames(rss.getString("names"));
    				member.setPmcode(rss.getString("pmcode"));
    				member.setRemark(rss.getString("remark"));
    				member.setSex(rss.getString("sex"));
    				member.setTelephone(rss.getString("telephone"));
    				member.setWxno(rss.getString("wxno"));
    				member.setPmdate(rss.getString("pmdate"));
    				allmembers.add(member);
    			//	System.out.println(member.getNames());
    			}
    			System.out.println("成功获取sqlserver数据库数据!");
    			return allmembers;
    			
    		} catch (SQLException e) {
    			// TODO Auto-generated catch block
    			System.out.println("获取sqlserver数据库数据发送异常!");
    			e.printStackTrace();
    		}
    		try {
    			rss.close();
    			stas.close();
    		} catch (SQLException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		return null;
    	}
    	
    	public void inputAll(ArrayList<Member> allmembers){
    		System.out.println("开始向mysql中写入");
    		String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    		try {
    			ps=con.prepareStatement(sql2);
    			System.out.println("-------------------------等待写入数据条数: "+allmembers.size());
    			for(int i=0;i<allmembers.size();i++){
    				ps.setString(1, allmembers.get(i).getPmcode());
    				ps.setString(2, allmembers.get(i).getNames());
    				//System.out.println(allmembers.get(i).getNames());
    				ps.setString(3, allmembers.get(i).getSex());
    				ps.setString(4, allmembers.get(i).getTelephone());
    				ps.setString(5, allmembers.get(i).getAddress1());
    				ps.setString(6, allmembers.get(i).getPmdate());
    				ps.setString(7, allmembers.get(i).getExpiry());
    				ps.setString(8, allmembers.get(i).getLevels());
    				ps.setString(9, allmembers.get(i).getDbno());
    				ps.setString(10, allmembers.get(i).getHispoints());
    				ps.setString(11, allmembers.get(i).getBnpoints());
    				ps.setString(12, allmembers.get(i).getKypoints());
    				ps.setString(13, allmembers.get(i).getWxno());
    				ps.setString(14, allmembers.get(i).getRemark());
    				//插入命令列表
    				//ps.addBatch();
    				ps.executeUpdate();
    			}
    			//ps.executeBatch();
    			con.commit();
    			
    			ps.close();
    			con.close();
    			this.flag=false;
    			System.out.println(Thread.currentThread().getName()+"--->OK");
    		} catch (SQLException e) {
    			// TODO Auto-generated catch block
    			System.out.println("向mysql中更新数据时发生异常!");
    			e.printStackTrace();	
    		}
    	}
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    
    		while(true&&flag){
    			this.inputAll(getAll());
    		}
    	}


    测试类:

    public class Test1 {
    		DbManager dm=null;
    		MyThread my1=null;
    	public Test1(){
    		dm=new DbManager();
    		System.out.println(dm.n+"----"+dm.residue);
    		
    	 if(dm.n<1){//数据条数小于30000单线程处理
    			my1=new MyThread(1,dm.sum);
    			my1.start=1;
    			my1.end=dm.residue;
    			Thread t1=new Thread(my1);
    	    	t1.start();
    		}else{//大于30000时
    	
        	//起n个线程 每个处理30000条数据
    		for (int i = 1; i <=dm.n; i++) {
    				new Thread(new MyThread(i)).start();
    				try {
    					Thread.sleep(1);
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		//处理掉余数
    		my1=new MyThread(dm.n*30000+1,dm.sum);
    		Thread t1=new Thread(my1);
        	t1.start();
    		} 
    	}
    	public static void main(String[] args) {
    		//new Test1();
    		//迁移完数据,自动关机
    		 try {
    			Runtime.getRuntime().exec("cmd  /c Shutdown -t 10");
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}  
    	}	


    经过多次测试:从sqlserver中读取16w条数据并写入mysql,耗时15min左右。

    开始会报错: java heap space 

    解决方案:(myeclipse)window->Preferences->Java->Installed JREs,选择当前的JRE,然后edit它;在新窗口里设置Default VM Arguments为 -Xms512M -Xmx512M即可


  • 相关阅读:
    php学习之Model类
    PHP学习之图像处理-水印类
    PHP学习之文件上传类
    windows 安装gitea
    下载安装office2019
    第6章 互联网的那些事儿
    第五章 了解你的用户
    第四章 关于测试的一些思考
    第三章 web设计原则:
    第二章 编程之道
  • 原文地址:https://www.cnblogs.com/riskyer/p/3329100.html
Copyright © 2011-2022 走看看