zoukankan      html  css  js  c++  java
  • 通过hive自定义函数直接回写数据到数据库

    hive一般用来执行离线统计分析相关的功能,然后将执行的结果导入到数据库的表中供前端报表可视化展现来查询。

    导回数据库的方式有许多,sqoop、hive jdbc、mr jdbc等等,但是这几种方式都会有一个二次处理环节(数据需要人工)。

    这次介绍另外一种处理方式,直接将对数据库的操作集成在udf中,这样直接写一个hql查询语句就可以了。

    代码如下:

    package com.taisenki.tools;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStream;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.Properties;
    
    import org.apache.commons.io.FilenameUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.apache.hadoop.io.IntWritable;
    
    /**
     * 在hive0.13版本之上才能注册永久函数,否则只能注册临时函数
     * @author taisenki
     *
     */
    @Description(name = "batch_import",  value = "_FUNC_(sql, args1[, args2,...][, config_path]) - Return ret ")  
    public class SqlBatchImportUDF extends GenericUDF {
    
        public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/sjk/"; 
        public static final String DEFAULT_CONFIG_FILE_NAME = "sjk.properties";  
        public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties";  
        private IntObjectInspector retValInspector; 
        private String sql;   
        private PrimitiveObjectInspector[] paramsInspectors;  
        private int insert = 0;
        private Connection conn;
        private PreparedStatement psi;
        private int count = 0;
        
        @Override
        public void close() throws IOException {
            // TODO Auto-generated method stub
            try {if (insert > 0) {
                    psi.executeBatch();
                    conn.commit();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }finally{
                try {
                    if(conn != null)
                        conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            super.close();
        }
    
        @Override
        public Object evaluate(DeferredObject[] arg0) throws HiveException {
            // TODO Auto-generated method stub
            try {
                for (int i = 0; i < count; i++) {  
                    Object param = paramsInspectors[i].getPrimitiveJavaObject(arg0[i + 1].get());  
                    psi.setObject(i + 1, param);  
                }  
                psi.addBatch();
                insert++;
                if(insert>1000){
                    psi.executeBatch();
                    conn.commit();
                    insert = 0;
                }  
                IntWritable iw = new IntWritable(insert);  
                return retValInspector.getPrimitiveWritableObject(iw);  
    
            } catch (SQLException e) {
                e.printStackTrace();
                throw new HiveException(e);
            }
        }
    
        @Override
        public String getDisplayString(String[] arg0) {
            // TODO Auto-generated method stub
            return "batch_import(sql, args1[, args2,...][, config_path])"; 
        }
    
        @Override
        public ObjectInspector initialize(ObjectInspector[] arg0)
                throws UDFArgumentException {
            // TODO Auto-generated method stub
            if (arg0.length < 2) {  
                throw new UDFArgumentException(" Expecting  at least two arguments ");  
            }  
            insert = 0;
            //第一个参数校验,必须是一个非空的sql语句  
            if (arg0[0].getCategory() == Category.PRIMITIVE  
                    && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
                if (!(arg0[0] instanceof ConstantObjectInspector)) {  
                    throw new UDFArgumentException("the frist arg   must be a sql string constant");  
                }  
                ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[0];  
                this.sql = sqlInsp.getWritableConstantValue().toString();  
                int i = -1;
                count = 0;
                while (true) {
                    i = sql.indexOf("?", i + 1);
                    if (i == -1) {
                        break;
                    }
                    count++;
                }
                if (this.sql == null || this.sql.trim().length() == 0) {  
                    throw new UDFArgumentException("the frist arg   must be a sql string constant and not nullable");  
                }  
            }  
            
            if (count+1 > arg0.length){
                throw new UDFArgumentException("arguments not enough with this sql["+(arg0.length-1)/count+"]");  
            }
            
            //默认情况
            String fileName1 = SqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + SqlBatchImportUDF.DEFAULT_CONFIG_FILE_NAME;
            //判断是否存在指定的配置文件路径
            if (count+1 < arg0.length){
                //第一个参数校验  
                if (arg0[count+1].getCategory() == Category.PRIMITIVE  
                        && ((PrimitiveObjectInspector) arg0[count+1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
                    if (!(arg0[count+1] instanceof ConstantObjectInspector)) {  
                        throw new UDFArgumentException("mysql connection pool config path  must be constant");  
                    }  
                    ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[count+1];  
          
                    fileName1 = propertiesPath.getWritableConstantValue().toString();  
                    Path path1 = new Path(fileName1);  
                    if (path1.toUri().getScheme() == null) {  
                        if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) {  
                            throw new UDFArgumentException("不支持的文件扩展名,目前只支持properties文件!");  
                        }  
                        //如果是相对路径,补齐根路径  
                        if (!fileName1.startsWith("/")) {  
                            fileName1 = SqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1;  
                        }  
                    }  
                    //如果只写了文件前缀的话,补上后缀  
                    if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) {  
                        fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX;  
                    }  
                } 
            }
    
            Properties properties = new Properties();  
            Configuration conf = new Configuration();  
            Path path2 = new Path(fileName1);  
    
            try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //这里不能用FileSystem.get(path2.toUri(), conf),必须得重新newInstance,get出来的是共享的连接,这边关闭的话,会导致后面执行完之后可能出现FileSystem is closed的异常  
                 InputStream in = fs.open(path2)) {  
                properties.load(in);
            } catch (FileNotFoundException ex) {  
                throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件");  
            } catch (Exception e) {  
                e.printStackTrace();  
                throw new UDFArgumentException(e);  
            }  
            
            try {
                Class.forName(properties.getProperty("driverClassName"));
                System.out.println(properties.getProperty("driverClassName"));
                System.out.println(properties.getProperty("url"));
                conn = DriverManager.getConnection(properties.getProperty("url"), properties);
                psi = conn.prepareStatement(sql);
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                throw new UDFArgumentException(e);  
            } catch (ClassNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                throw new UDFArgumentException(e);  
            }  
            //中间为参数
            paramsInspectors = new PrimitiveObjectInspector[count];  
            for (int i = 0; i < count; i++) {  
                paramsInspectors[i] = (PrimitiveObjectInspector) arg0[i+1];  
            }  
            retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector; 
            return retValInspector;
        }
    
    }

    然后上传jar包,注册udf,注意此处需把对应数据库的驱动包一同进行注册操作:

    如回写oracle: create function default.oracleSave as 'com.taisenki.tools.SqlBatchImportUDF' using jar 'hdfs://cdh5/data/lib/test.jar', jar 'hdfs://cdh5/data/lib/ojdbc6.jar';

    然后写一个HQL测试一下:

    select oracleSave('insert into test111 values (?)',b.id) from (select 2 id from dual) b;

    UDF第一个参数是静态参数,是对应数据库的sql语句,描述入库方式,然后后面的参数就不固定了,一一对应sql语句中的占位符,比如我上面有1个占位符,然后我后面就跟了1个参数。

    若传入的参数恰好比占位符多1个的时候,最后一个参数则为指定数据库配置文件名,里面配置了如何开启连接池连接哪个数据库什么的。

    附上一个默认的sjk.properties:

    driverClassName=oracle.jdbc.driver.OracleDriver
    url=jdbc:oracle:thin:@host:port:inst
    user=test
    password=test

    此处注意,如果是hive 0.13以下的版本,是不支持注册永久function的,请使用
    create temporaryfunction来进行,而且只支持session级别的,断开后自动消失……

  • 相关阅读:
    Java中只有按值传递,没有按引用传递!(两种参数情况下都是值传递)
    最简单的struts实例介绍
    Spring中bean的五个作用域简介(转载)
    Spring配置文件
    轻松搞定面试中的二叉树题目 (转)
    二叉树
    稳定排序与非稳定排序判别方法
    Yii的缓存机制之动态缓存
    Yii的缓存机制之数据缓存
    Yii的缓存机制之页面缓存
  • 原文地址:https://www.cnblogs.com/taisenki/p/6224642.html
Copyright © 2011-2022 走看看