zoukankan      html  css  js  c++  java
  • 读取hive文件并将数据导入hbase

      

    package cn.tansun.bd.hbase;
    
    
    
    import java.io.IOException;
    
    import java.net.URI;
    
    import java.util.List;
    
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.conf.Configured;
    
    import org.apache.hadoop.fs.FileSystem;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.hbase.KeyValue;
    
    import org.apache.hadoop.hbase.client.HTable;
    
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    
    import org.apache.hadoop.hbase.util.Bytes;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    
    import org.apache.hadoop.mapreduce.Mapper;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import org.apache.hadoop.util.Tool;
    
    import org.apache.hadoop.util.ToolRunner;
    
    import cn.tansun.bd.utils.JDBCUtils;
    
    
    
    /**
    
     * @author 作者 E-mail: zgl
    
     * @version 创建时间:2016年7月5日 下午7:57:17 类说明
    
     */
    
    public class HiveMySQl2HBaseMR extends Configured implements Tool {
    
    
    
        public static String tableName;
    
        public static String cf = null;
    
        public static String strRowkey = null;
    
        public static String strIndex = null;
    
        public static String column_name = null;
    
        public static String strColumn = null;
    
        private static Configuration conf = null;
    
        public static void main(String[] args) {
    
            getDatas();
    
            try {
    
                int runs = ToolRunner.run(new HiveMySQl2HBaseMR(), args);
    
                System.exit(runs);
    
            } catch (Exception e) {
    
                e.printStackTrace();
    
            }
    
        }
    
    
    
        // mysql读取获得cf、rowKey、cloumn, qual
    
        @SuppressWarnings("rawtypes")
    
        public static List<Map> getDatas() {
    
            // List<Map> listDatas = new ArrayList<Map>();
    
            String sql = "SELECT DISTINCT s.tableName, ar.rowkey,af.column_family,     aq.column_hive_index,   aq.column_name FROM "
    
                    + " archive_htable s,     archive_hrowkey ar,     archive_hfamily af,     archive_hqualifier aq WHERE "
    
                    + "    s.rowkey_id = ar.rowkey_id  AND ar.family_id = af.family_id    AND s.tableName = '2'";
    
            List<Map> selectDatas = JDBCUtils.selectDatas(sql);
    
            for (Map<String, String> metaData : selectDatas) {
    
                if (null == tableName) {
    
                    tableName = metaData.get("tableName");
    
                }
    
                if (null == cf) {
    
                    cf = metaData.get("column_family");
    
                }
    
                if (null == strRowkey) {
    
                    strRowkey = metaData.get("rowkey");
    
                }
    
    
    
                String strTempIndex = metaData.get("column_hive_index");
    
                String strTempName = metaData.get("column_name");
    
                if (null == strColumn
    
                        || (null != strColumn && "".equals(strColumn))) {
    
                    strColumn = strTempIndex + "    " + strTempName;
    
                } else {
    
                    strColumn = strColumn + "," + strTempIndex + "    " + strTempName;
    
                }
    
            }
    
            return selectDatas;
    
        }
    
    
    
        @SuppressWarnings("deprecation")
    
        public int run(String[] args) throws Exception {
    
            /*
    
             * if (args.length != 3){ System.err.println(
    
             * "Usage: HiveMySQl2HBaseMR <table_name><data_input_path><hfile_output_path>"
    
             * ); System.exit( -1 ); }
    
             */
    
            conf  = new Configuration();
    
            conf.addResource("hbase-site.xml");
    
    
    
            String table = "2";
    
            String input = "hdfs://node11:9000/datas/hivedata5";
    
            String output = "hdfs://node11:9000/datas/out1";
    
    
    
            HTable htable;
    
            try {
    
                // 运行前,删除已存在的中间输出目录
    
                try {
    
                    FileSystem fs = FileSystem.get(URI.create(output), conf);
    
                    fs.delete(new Path(output), true);
    
                    fs.close();
    
                } catch (IOException e1) {
    
                    e1.printStackTrace();
    
                }
    
    
    
                htable = new HTable(conf, table.getBytes());
    
                Job job = new Job(conf);
    
                job.setJobName("Generate HFile");
    
    
    
                job.setJarByClass(HiveMySQl2HBaseMR.class);
    
                job.setInputFormatClass(TextInputFormat.class);
    
                job.setMapperClass(HiveMySQlMapper.class);
    
                FileInputFormat.setInputPaths(job, input);
    
    
    
                job.getConfiguration().set("mapred.mapoutput.key.class",
    
                        "org.apache.hadoop.hbase.io.ImmutableBytesWritable");
    
                job.getConfiguration().set("mapred.mapoutput.value.class",
    
                        "org.apache.hadoop.hbase.KeyValue");
    
    
    
                FileOutputFormat.setOutputPath(job, new Path(output));
    
    
    
                HFileOutputFormat2.configureIncrementalLoad(job, htable);
    
    
    
                try {
    
                    job.waitForCompletion(true);
    
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
    
                } catch (ClassNotFoundException e) {
    
                    e.printStackTrace();
    
                }
    
            } catch (IOException e) {
    
                e.printStackTrace();
    
            }
    
            return 0;
    
        }
    
    
    
        public static class HiveMySQlMapper extends
    
                Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    
    
    
            @Override
    
            protected void setup(
    
                    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
    
                    throws IOException, InterruptedException {
    
                 super.setup( context );
    
                 conf = new Configuration();
    
            }
    
    
    
            String tableName = HiveMySQl2HBaseMR.tableName;
    
            String cf = HiveMySQl2HBaseMR.cf;
    
            String rowKey = HiveMySQl2HBaseMR.strRowkey;
    
            String strColumnName = HiveMySQl2HBaseMR.column_name;
    
            String strColumn = HiveMySQl2HBaseMR.strColumn;
    
            String split = "001";
    
    
    
            @Override
    
            protected void map(
    
                    LongWritable key,
    
                    Text value,
    
                    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
    
                    throws IOException, InterruptedException {
    
                // 将rowkey 是数字并且带有","的分隔符去掉,获得对应的数字
    
                // get rkMaps indexa
    
                String strRowKey = "";
    
                String[] datas = value.toString().split("	");
    
                
    
                for (String strIndex : rowKey.split(",")) {
    
                    if (null == (strRowKey) || (null != strRowKey)
    
                            && "".equals(strRowKey)) {
    
                        strRowKey = datas[Integer.valueOf(strIndex)];
    
                    } else {
    
                        strRowKey = strRowKey + split
    
                                + datas[Integer.valueOf(strIndex)];
    
                    }
    
                }
    
    
    
                for (String str : strColumn.split(",")) {
    
                    String[] columnTupe = str.split("	");
    
                    String columnData = datas[Integer.valueOf(columnTupe[0])];
    
                    String columnName = columnTupe[1];
    
                    System.out.println(columnData + "columnDatacolumnData");
    
    
    
                    ImmutableBytesWritable rk = new ImmutableBytesWritable(
    
                            Bytes.toBytes(rowKey));
    
                    // byte[] row, byte[] family, byte[] qualifier, byte[] value
    
                    KeyValue kv = new KeyValue(Bytes.toBytes(strRowKey), // "a01b01c01"
    
                            cf.getBytes(), Bytes.toBytes(columnName),
    
                            Bytes.toBytes(columnData));
    
                    context.write(rk, kv);
    
                }
    
            }
    
        }
    
    
    
    }

    JDBCUtils类:

    package cn.tansun.bd.utils;
    
    import java.io.ByteArrayInputStream;
    
    import java.io.IOException;
    
    import java.io.InputStream;
    
    import java.sql.Connection;
    
    import java.sql.DriverManager;
    
    import java.sql.ResultSet;
    
    import java.sql.ResultSetMetaData;
    
    import java.sql.SQLException;
    
    import java.sql.Statement;
    
    import java.util.ArrayList;
    
    import java.util.HashMap;
    
    import java.util.Iterator;
    
    import java.util.List;
    
    import java.util.Map;
    
    import java.util.Map.Entry;
    
    import java.util.Properties;
    
    import java.util.Set;
    
    
    
    /**
    
     * @author 作者 E-mail: zgl
    
     * @version 创建时间:2016年6月23日 下午4:25:03 类说明
    
     */
    
    public class JDBCUtils {
    
        public JDBCUtils()
    
        {
    
        }
    
    
    
        public static String PATH = "jdbc.properties";
    
    
    
        public static Properties prop;
    
    
    
        public static String url = null;
    
    
    
        public static String username = null;
    
    
    
        public static String password = null;
    
    
    
        public static Connection conn;
    
    
    
        public static Statement stmt;
    
    
    
        public static ResultSet rs;
    
    
    
        public static String fileName = null;
    
        static {
    
            try {
    
                InputStream inputStream = JDBCUtils.class.getClassLoader().getResourceAsStream( PATH );
    
                prop = new Properties();
    
                prop.load( inputStream );
    
                url = prop.getProperty( "jdbc.url" );
    
                username = prop.getProperty( "jdbc.username" );
    
                password = prop.getProperty( "jdbc.password" );
    
                if ( inputStream != null ) {
    
                    inputStream.close();
    
                }
    
            }
    
            catch ( IOException e ) {
    
    
    
                e.printStackTrace();
    
            }
    
        }
    
    
    
        public static void closeConnection( Connection conn ) {
    
            if ( conn != null ) {
    
                try {
    
                    conn.close();
    
                }
    
                catch ( SQLException e ) {
    
                    e.printStackTrace();
    
                }
    
            }
    
        }
    
    
    
        /**
    
         * 根据sql语句查询
    
         * 
    
         * @param sql
    
         * @return
    
         */
    
        @SuppressWarnings( "rawtypes" )
    
        public static List<Map> selectDatas( String sql ) {
    
            List<Map> listDatas = new ArrayList<Map>();
    
            try {
    
                conn = DriverManager.getConnection( url, username, password );
    
                conn.setAutoCommit( false );
    
                stmt =
    
                    conn.prepareStatement( "load data local infile '' " + "into table loadtest fields terminated by ','" );
    
                StringBuilder sb = new StringBuilder();
    
                InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
    
                ( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
    
                rs = stmt.executeQuery( sql );
    
                if ( rs != null ) {
    
                    ResultSetMetaData metaData = rs.getMetaData();
    
    
    
                    int count = metaData.getColumnCount();
    
                    Map<String, Object> map = null;
    
                    while ( rs.next() ) {
    
                        map = new HashMap<String, Object>();
    
                        for ( int i = 1; i < count + 1; i++ ) {
    
                            map.put( metaData.getColumnName( i ), rs.getObject( i ) );
    
                        }
    
                        listDatas.add( map );
    
                    }
    
    
    
                }
    
            }
    
            catch ( SQLException e ) {
    
                e.printStackTrace();
    
            }
    
            return listDatas;
    
        }
    
    
    
        /**
    
         * 
    
         * @param sql
    
         * @return
    
         */
    
        public static List<String>  getStrMap( String  sql) {
    
             List<String> strList = new ArrayList<String>();
    
             try {
    
                 conn = DriverManager.getConnection( url, username, password );
    
                 conn.setAutoCommit( false );
    
                 stmt =
    
                     conn.prepareStatement( "load data local infile '' " + "into table loadtest fields terminated by ','" );
    
                 StringBuilder sb = new StringBuilder();
    
                 InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
    
                 ( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
    
                 rs = stmt.executeQuery( sql );
    
                 if ( rs != null ) {
    
                     ResultSetMetaData metaData = rs.getMetaData();
    
                     int count = metaData.getColumnCount();
    
                     while (rs.next()){
    
                         
    
                     for (int i = 1; i < count + 1; i++){
    
                         //String str1 = metaData.getColumnName( i );
    
                         String str2 = (String) rs.getObject( i );
    
                         strList.add(str2);
    
                     }
    
                  }
    
                 }
    
             }
    
             catch ( SQLException e ) {
    
                 e.printStackTrace();
    
             }
    
            return strList;
    
        }
    
    
    
        public static String table_name = null;
    
    
    
        public static String rowkey = null;
    
    
    
        public static String column_family = null;
    
    
    
        public static String column_name = null;
    
    
    
        private static String rows = null;
    
        public static String sql = null;
    
        public static String sql2 = null;
    
    
    
        @SuppressWarnings( "rawtypes" )
    
        public static void main( String[] args ) {
    
             
    
    		sql2 = "SELECT   GROUP_CONCAT( DISTINCT aq.column_hive_index,'  ',   aq.column_name ,' '    ORDER BY   "
    
    				+ "    aq.column_hive_index SEPARATOR ','  ) AS column_names FROM  archive_hqualifier  aq "
    
    				+ "where aq.table_id = 77 GROUP BY   aq.column_name ORDER BY aq.column_hive_index";
    
    		sql ="SELECT DISTINCT 	s.tableName, 	ar.rowkey,	af.column_family,	"
    
    				+ "aq.column_name FROM	archive_htable s,archive_hrowkey ar,archive_hfamily af,"
    
    				+ " 	archive_hqualifier aq "
    
    				+ "WHERE s .rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id "
    
    				+ "AND af.qualifier_id = aq.qualifier_id;";     
    
    		String datas  = null;
    
    		List<String> strList = getStrMap(sql);
    
    		String substring  = null;
    
    		 for (int i = 0; i < strList.size(); i++){
    
    			datas = strList.get(i);
    
    			//datas = strList.get(i).substring(0,   strList.get(i).length()-1);
    
    			System.out.print(datas);
    
    		 }
        }
    }
    

      

  • 相关阅读:
    172. Factorial Trailing Zeroes
    96. Unique Binary Search Trees
    95. Unique Binary Search Trees II
    91. Decode Ways
    LeetCode 328 奇偶链表
    LeetCode 72 编辑距离
    LeetCode 226 翻转二叉树
    LeetCode 79单词搜索
    LeetCode 198 打家劫舍
    LeetCode 504 七进制数
  • 原文地址:https://www.cnblogs.com/zhanggl/p/5658517.html
Copyright © 2011-2022 走看看