zoukankan      html  css  js  c++  java
  • hive表增量抽取到oracle数据库的通用程序(一)

    hive表增量抽取到oracle数据库的通用程序(二)

    sqoop在export的时候 只能通过--export-dir参数来指定hdfs的路径。而目前的需求是需要将hive中某个表中的多个分区记录一次性导出到oracle数据库中,由于不支持通配符,又不想设置多个workflow。为了替代蹩脚的sqoop,准备使用java来开发通用包来替代这个导出功能。

    通过给java程序提供具体的参数,完成数据的拉取。

    为了与sqoop传参方式一致,使用了--开头(在java程序中其实是将--过滤掉了),相关的参数说明如下:

    --hive_driver        hive的驱动
    --hive_url            hiveserver2的连接url
    --hive_username        连接hive2的用户名
    --hive_password        连接hive2的密码
    --hive_hql            要查询的hql语句
    --rdms_driver        要导入到的关系型数据库的驱动
    --rdms_url            关系型数据库的连接url
    --rdms_username        关系型数据库的用户名
    --rdms_password        关系型数据库的密码
    --rdms_tableName    关系型数据库的表名
    --rdms_columnNames  关系型数据库要插入的字段名
    --rdms_presql        关系型数据库的预处理sql语句,可用于 导入数据前先执行删除记录,防止重复导入。
    com.gw.exe.Hive2RMDS即为我自己定义的java程序。
    package com.gw.exe;
    
    import java.lang.reflect.Field;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import com.gw.exe.DBUtil;
    
    public class Hive2RMDS {
        
        private static Connection conn = null;
        private static PreparedStatement ps = null;
        private static ResultSet rs = null;
        
        // hive参数
        private static String hive_driver = "org.apache.hive.jdbc.HiveDriver";
        private static String hive_url = null;
        private static String hive_username = "hive2";
        private static String hive_password = "";
        private static String hive_hql = null;
        
        // RDMS参数
        private static String rdms_driver = "oracle.jdbc.driver.OracleDriver";
        private static String rdms_url = null;
        private static String rdms_username = null;
        private static String rdms_password = null;
        private static String rdms_presql = null;
        private static String rdms_tableName = null;
        private static String rdms_columnNames = null;
    
        
        public static void main(String[] args) throws Exception {
            
           Map<String,String> map = getParams(args);
            init(map);
            
            //预先处理rdms sql
            if(map.containsKey("rdms_presql")){
                exePreSql();
            }
    
            //
            List<Map<String,String>> list = getHiveList();
            if(list!=null && list.size() > 0){
                insertRdms(list);
            } else {
                throw new Exception("hive中未查询到记录");
            }
        }
        
        
        private static void init(Map<String,String> map){
         //通过反射将传进来的参数赋值给静态变量
            String[] paramNames = {"hive_driver","hive_url","hive_username","hive_password","hive_hql",
                                   "rdms_driver","rdms_url","rdms_username","rdms_password",
                                   "rdms_presql","rdms_tableName","rdms_columnNames"};
            Hive2RMDS h2r = new Hive2RMDS();
            for(String paramName:paramNames){
                if(map.containsKey(paramName)){
                    try {
                        Field field = Hive2RMDS.class.getDeclaredField(paramName);
                        field.setAccessible(true);
                        field.set(h2r, map.get(paramName));
                    } catch (IllegalArgumentException | IllegalAccessException |NoSuchFieldException | SecurityException e) {
                        e.printStackTrace();
                    }
                }
            }
            
            //打印参数值:
            System.out.println("解析后参数如下:");
            for(String paramName:paramNames){
                if (map.containsKey(paramName)){
                    System.out.println("key:" + paramName + " value:" + map.get(paramName));
                }
            }
        }
        
        //解析参数
        private static Map<String,String> getParams(String[] params){
            Map<String,String> map = new HashMap<String,String>();
            String key = null;
            String value = null;
            for(int i=0;i<params.length; i++){
                if(i%2==0){
                    key = params[i].startsWith("--") ? params[i].substring(2) : params[i];
                }
                if(i%2==1){
                    value = params[i];
                    map.put(key, value);
                    key = null; value = null;
                }
                
            }
            return map;
        }
        
        
        //获取hive记录
        private static List<Map<String,String>> getHiveList(){
            
            String[] columns = rdms_columnNames.split(",");
            List<Map<String,String>> list = null;
            try {
                conn = DBUtil.getConnection(hive_driver,hive_url,hive_username,hive_password);
                
    //            ps = conn.prepareStatement(hive_hql);
                Statement ps = conn.createStatement();
                rs = ps.executeQuery(hive_hql);
                
                list = new ArrayList<Map<String,String>>();
                while(rs.next()){
                    Map<String,String> map = new HashMap<String,String>();
                    for(String column:columns){
                        String value = rs.getString(column);
                        map.put(column, value);
    //                    System.out.println("column:" + column + " value:" + value);
                    }
                    
                    list.add(map);
                }
            } catch (SQLException e) {
                e.printStackTrace();
    //            System.exit(1);
            } finally {
                DBUtil.close(rs, ps, conn);
            }
            System.out.println("hive中获取到记录数:" + (list!=null?list.size():0));
            return list;
            
        }
        
        //写入rdms
        private static void insertRdms(List<Map<String,String>> list){
            
            try {
                String[] columns = rdms_columnNames.split(",");
                conn = DBUtil.getConnection(rdms_driver,rdms_url,rdms_username,rdms_password);
                System.out.println("Connect:" + conn.toString());
                //插入oracle
                String sql = getInsertSqlString(rdms_tableName, rdms_columnNames.toUpperCase());
                System.out.println("sql:" + sql);
                
                conn.setAutoCommit(false);
                
                ps = conn.prepareStatement(sql);
                for(Map<String,String> map:list){
                    for(int i=1; i<= columns.length;i++){
                        ps.setObject(i, map.get(columns[i-1]));
                    }
                    ps.addBatch();  
                }
                
                int[] result = ps.executeBatch(); 
                conn.commit();
                
                System.out.println("insert : " + result.length); 
                //提交,设置事务初始值  
            } catch (SQLException e) {
                e.printStackTrace();
            } finally{
                DBUtil.close(rs, ps, conn);
            }
    
        }
        
        //根据表名和字段拼接insert sql
        private static String getInsertSqlString(String tableName,String fieldNames){
            int size = fieldNames.split(",").length;
            StringBuffer sb = new StringBuffer("");
            sb.append("insert into ").append(tableName).append("(").append(fieldNames).append(")")
            .append("values(").append(String.join(",",Collections.nCopies(size,"?"))).append(")");
            return sb.toString();
        }
    
        //预先执行的sql
        private static void exePreSql(){
            Statement statement = null;
            try {
                conn = DBUtil.getConnection(rdms_driver,rdms_url,rdms_username,rdms_password);
                statement = conn.createStatement();    
                int size = statement.executeUpdate(rdms_presql);  
                System.out.println("pre sql process record size : " + size); 
            } catch (SQLException e) {
                e.printStackTrace();
            } finally{
                try {
                    if(statement != null){statement.close();}
                }catch (SQLException e) {
                    e.printStackTrace();
                } 
                try {
                    if(conn != null){conn.close();}
                } catch (SQLException e) {
                    e.printStackTrace();
                }  
            }
        }
    }

    数据库连接 DBUtil.java

    package com.gw.exe;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    public class DBUtil {
    
        private static Connection conn = null;
    
        // 得到连接
        public static Connection getConnection(String driver,String url,String username,String passwd) {
            try
            {
                Class.forName(driver);
                conn = DriverManager.getConnection(url, username, passwd);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return conn;
        }
    
        // 关闭资源
        public static void close(ResultSet rs, Statement ps, Connection conn) {
            // 关闭资源(先开后关)
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
                rs = null;
            }
            if (ps != null) {
                try {
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
                ps = null;
            }
            if (null != conn) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
                conn = null;
            }
        }
    
    }

    程序在执行过程中需要依赖hive的相关jar包,以下为最小依赖包(使用过程中缺少什么包,自己添加):

    exehive2.jar是开发 的程序包。

    说明: 

    1. 由于是将查询的结果一次性保存到List列表中,比较适合小规模的数据导出到关系型数据库中,比如几万记录,在导入到oracle中由于开启了批处理,因此效率很高。

        可以自行修改程序,增加一个阈值比如10万,在获取hive结果时,增加判断,达到10万记录提交一次到oracle,不足10万条记录直接提交就可以了。

    2. 没有update功能,使用时,可以先删除,后insert。

    3. 程序应用版本: jdk1.8, hadoop2.7.3,hive.2.3.0 oozie4.3.0

  • 相关阅读:
    修改Nginx的header伪装服务器
    解除与设置计算机锁定
    Adobe flash cs5 的Java运行时环境初始化错误 完美解决方法
    js正则表达式教程
    Eclipse窗口显示:独立、嵌入式
    [置顶] wzplayer for android NEON版本(添加插图)
    买了一块s5pv210 的开发板
    [置顶] android player ,wzplayer for android NEON版本(添加插图)
    make 输出 log 文件
    android player ,wzplayer for android NEON版本(添加插图)
  • 原文地址:https://www.cnblogs.com/30go/p/9056786.html
Copyright © 2011-2022 走看看