zoukankan      html  css  js  c++  java
  • hadoop集群安装与配置--sqoop服务调用


    一、基于thrift生成java服务端代码和C#端代码

    thrift接口代码:文件名称sqoopthrift.thrift

    namespace java com.javabloger.gen.code
    service SqoopThrift{   
    string CreateTable(1:string host, 2:string database, 3:string userName, 4:string password, 5:string tableName, 6:string hiveTaleb)
    string ImportDataInHive(1:string host, 2:string database, 3:string userName, 4:string password, 5:string tableName, 6:string hiveTaleb,7:string keyfield)
    string ImportDataInHiveByWhere(1:string host, 2:string database, 3:string userName, 4:string password, 5:string tableName, 6:string hiveTaleb, 7:string where, 8:list<string> columnNames,9:string keyfield)
    string ExportData(1:string host, 2:string database, 3:string userName, 4:string password, 5:string tableName, 6:string hiveTaleb)
    string ExportDataUpdate(1:string host, 2:string database, 3:string userName, 4:string password, 5:string tableName, 6:string hiveTaleb,7:string keyfield)
    }
    thrift命令:thrift -gen java XXX.thrift              (注:基于windows下thrift0.90版本)

    生成的java端thrift代码略:下载

    thrift命令:thrift -gen csharp XXX.thrift

    生成的C#端thrift代码略:下载

    二、C#端调用sqoop服务demo

       var socket = new TSocket(sqoop服务在ip,sqoop服务的端口);
                transport = new TBufferedTransport(socket);
                var proto = new TBinaryProtocol(transport);
                sqoopClient = new SqoopThriftClient(proto);
     var feedback = "";
                try
                {
                    if (!transport.IsOpen)
                        transport.Open();
                    if (string.IsNullOrEmpty(hiveTaleb))
                        feedback = sqoopClient.ImportDataInHive(数据库ip, 数据库, 数据库用户, 数据库用户密码, 要导入的表名, 要导入的hive表名, 导入hive需要的分区字段);
                    else
                        feedback = sqoopClient.ImportDataInHive(_host, _database, _userName, _password, tableName, hiveTaleb, keyfield);
                    transport.Close();
                }
                catch (Exception ex)
                {
                    transport.Close();               
                }
    三、java端sqoop服务编写

    采用java调用shell命令去调用sqoo的功能

    1.调用shell的类,此类来源,有部分改变

    package centos.shell;
    
    import java.io.BufferedReader;  
    import java.io.FileOutputStream;  
    import java.io.IOException;   
    import java.io.InputStreamReader;
    import java.io.OutputStream;  
    import java.io.OutputStreamWriter;  
    import java.text.DateFormat;  
    import java.text.SimpleDateFormat;  
    import java.util.Date;  
      
    public class JavaShellUtil {  
    	private static String basePath = "/tmp/"; 
    	private static  String executeShellLogFile;  
    	public JavaShellUtil(String logPath){
    		if(!logPath.isEmpty())
    		basePath=logPath;
    		executeShellLogFile = basePath + "executeShell"; 
    	}
        
    public String executeShell(String shellCommand) throws IOException {   
    	StringBuffer stringBuffer = new StringBuffer();  
    	BufferedReader bufferedReader = null;  
    	BufferedReader errorReader = null;  
    	DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS ");  
    	DateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd-HH-mm-SS"); 
      
    	try {  
    		stringBuffer.append(dateFormat.format(new Date())).append("ready for cmd:").append(shellCommand).append(" 
    ");	  
    		Process pid = null;  
    		String[] cmd = {"/bin/bash", "-c", shellCommand};  	
    		pid = Runtime.getRuntime().exec(cmd);  
    		if (pid != null) {  
    			stringBuffer.append("threadid:").append(pid.toString()).append("
    ");  
    			errorReader = new BufferedReader(new InputStreamReader(pid.getErrorStream()));  
    			bufferedReader = new BufferedReader(new InputStreamReader(pid.getInputStream()));  
    			pid.waitFor();  
    		} else {  
    			stringBuffer.append("no pid
    ");  
    		}  
    		stringBuffer.append(dateFormat.format(new Date())).append("Shell is over
    feed back
    ");  
    		String line = null;  
    		while (bufferedReader != null &&  (line = bufferedReader.readLine()) != null) {  
    		stringBuffer.append(line).append("
    ");  
    		}  
    		while (errorReader != null &&  (line = errorReader.readLine()) != null) {  
    			stringBuffer.append(line).append("
    ");  
    			}
    	} catch (Exception ioe) {  
    		stringBuffer.append("runing of exception
    ").append(ioe.getMessage()).append("
    ");  
    	} finally {  
    		if (bufferedReader != null) {  
    			OutputStreamWriter outputStreamWriter = null;  
    			try {  
    				bufferedReader.close();  
    				errorReader.close(); 
    				OutputStream outputStream = new FileOutputStream(executeShellLogFile+dateFormat1.format(new Date())+".log");  
    				outputStreamWriter = new OutputStreamWriter(outputStream, "UTF-8");  
    				outputStreamWriter.write(stringBuffer.toString());  
    			} catch (Exception e) {  
    				e.printStackTrace();  
    			} finally {  
    				outputStreamWriter.close();  
    			}  
    		} 
    	}  
    	return stringBuffer.toString();  
    	}  
    }  
    

    2.业务实现

    package centos.shell;
    
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.thrift.TException;
    
    public class SqoopServiceImp implements SqoopThrift.Iface {
    	private JavaShellUtil cmdHelper;	
    	private String shellpath="/usr/local/sqoop/";
    	public SqoopServiceImp(String shellbasepath,String shellLogPath){
    		if(!shellbasepath.isEmpty()){
    			shellpath=shellbasepath;
    		}
    		cmdHelper=new JavaShellUtil(shellLogPath);
    	}	
    	@Override
    	public String ExportData(String host, String database, String userName,
    			String password, String tableName, String hiveTaleb)
    			throws TException {
    		System.out.print("begin ExportData "+tableName+"
    ");
    		String feedback="";
    		String shellCommand=shellpath+"bin/sqoop export --connect 'jdbc:sqlserver://"+host
    		+";username="+userName+";password="+password+";database="+database+"' --table "+tableName+" --export-dir /user/hive/warehouse/"+hiveTaleb
    		+" --input-fields-terminated-by '01'";
    		try {
    			feedback=cmdHelper.executeShell(shellCommand);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.print("ExportData over
    ");
    		return feedback;
    	}
    
    	@Override
    	public String ExportDataUpdate(String host, String database,
    			String userName, String password, String tableName,
    			String hiveTaleb, String keyfield) throws TException {
    		System.out.print("begin ExportDataUpdate "+tableName+"
    ");
    		String feedback="";
    		String shellCommand=shellpath+"bin/sqoop export --connect 'jdbc:sqlserver://"+host
    		+";username="+userName+";password="+password+";database="+database+"' --table "+tableName
    		+" --update-key '"+keyfield+"' --export-dir /user/hive/warehouse/"+hiveTaleb+" --input-fields-terminated-by '01'";
    		try {
    			feedback=cmdHelper.executeShell(shellCommand);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.print("ExportDataUpdate over
    ");
    		return feedback;
    	}
    
    	@Override
    	public String CreateTable(String host, String database,
    			String userName, String password, String tableName,
    			String hiveTaleb)
    			throws TException {
    		System.out.print("begin create hive table "+tableName+"
    ");
    		String feedback="";		
    		String shellCommand=shellpath+"bin/sqoop create-hive-table --connect 'jdbc:sqlserver://"+host
    		+";username="+userName+";password="+password+";database="+database+"' --table "+tableName+" --hive-table "+hiveTaleb;
    		try {
    			//System.out.print("cmd is run 
    ");
    			feedback=cmdHelper.executeShell(shellCommand);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.print("create hive table over
    ");
    		return feedback;
    	}
    
    	@Override
    	public String ImportDataInHive(String host, String database,
    			String userName, String password, String tableName,
    			String hiveTaleb, String keyfield) throws TException {
    		System.out.print("begin ImportDataInHive "+tableName+"
    ");
    		String feedback="";
    		String shellCommand=shellpath+"bin/sqoop import --connect 'jdbc:sqlserver://"+host
    		+";username="+userName+";password="+password+";database="+database+"' --table "+tableName+" --hive-import --hive-table "+hiveTaleb
    		+" --split-by '"+keyfield+"'";
    		try {
    			feedback=cmdHelper.executeShell(shellCommand);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.print("ImportDataInHive over
    ");
    		return feedback;
    	}
    
    	@Override
    	public String ImportDataInHiveByWhere(String host, String database,
    			String userName, String password, String tableName,
    			String hiveTaleb, String where, List<String> columnNames,
    			String keyfield) throws TException {
    		System.out.print("begin ImportDataInHiveByWhere "+tableName+"
    ");
    		String fileds="";
    		for(int i=0;i<columnNames.size();i++){
    			fileds=fileds+","+columnNames.get(i);
    		}
    		if(fileds.length()>1)
    			fileds=fileds.substring(1, fileds.length()-1);
    		String feedback="";
    		String shellCommand=shellpath+"bin/sqoop import --connect 'jdbc:sqlserver://"+host
    		+";username="+userName+";password="+password+";database="+database+"' --table "+tableName+" --where '"+where+"' --columns '"+fileds
    		+"' --hive-import --hive-table "+hiveTaleb
    		+" --split-by '"+keyfield+"'";
    		try {
    			feedback=cmdHelper.executeShell(shellCommand);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.print("ImportDataInHiveByWhere over
    ");
    		return feedback;
    	}
    	
    
    	
    
    }
    
    3.服务运行

    import java.io.IOException;
    
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TBinaryProtocol.Factory;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadPoolServer;
    import org.apache.thrift.server.TThreadPoolServer.Args;
    import org.apache.thrift.transport.TServerSocket;
    import org.apache.thrift.transport.TTransportException;
    
    import centos.shell.SqoopServiceImp;
    import centos.shell.SqoopThrift;
    
    
    
    public class shelltest {
    
    	/**
    	 * @param args
    	 * @throws IOException 
    	 */
    	public static void main(String[] args) throws IOException {	
    		System.out.print("sqoop thrift server is start!
    ");
    		 startServer(args[0],args[1]);
    			
    	}
    	public static void startServer(String shellpath,String logPath) {
            try {
    
                TServerSocket serverTransport = new TServerSocket(服务运行端口);
    
                SqoopThrift.Processor<SqoopServiceImp> process = new SqoopThrift.Processor<SqoopServiceImp>(new SqoopServiceImp(shellpath,logPath));
    
                Factory portFactory = new TBinaryProtocol.Factory(true, true);
    
                Args args = new Args(serverTransport);
                args.processor(process);
                args.protocolFactory(portFactory);
    
                TServer server = new TThreadPoolServer(args);
                server.serve();
            } catch (TTransportException e) {
                e.printStackTrace();
            }
        }
    
    
    }
    

    4.注意,此项目引用如下类库

    libthrift-0.9.0.jar
    log4j-1.2.16.jar
    slf4j-log4j12-1.6.1.jar
    slf4j-api-1.6.1.jar

    5.服务启动

    命令格式:java -jar 生成的jar的名称

    若要后台运行格式为:java -jar 生成的jar的名称 &

  • 相关阅读:
    Python3 模块
    python os 方法
    python第三方模块的导入
    深拷贝和浅拷贝的区别
    win10专业版激活方法
    Python3 JSON
    python函数
    去重 方法
    VUE-地区选择器(V-Distpicker)组件使用
    ajax
  • 原文地址:https://www.cnblogs.com/AI001/p/3368916.html
Copyright © 2011-2022 走看看