zoukankan      html  css  js  c++  java
  • 读取hive文件并将数据导入hbase

      

    package cn.tansun.bd.hbase;
    
    
    
    import java.io.IOException;
    
    import java.net.URI;
    
    import java.util.List;
    
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.conf.Configured;
    
    import org.apache.hadoop.fs.FileSystem;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.hbase.KeyValue;
    
    import org.apache.hadoop.hbase.client.HTable;
    
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    
    import org.apache.hadoop.hbase.util.Bytes;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    
    import org.apache.hadoop.mapreduce.Mapper;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import org.apache.hadoop.util.Tool;
    
    import org.apache.hadoop.util.ToolRunner;
    
    import cn.tansun.bd.utils.JDBCUtils;
    
    
    
    /**
    
     * @author 作者 E-mail: zgl
    
     * @version 创建时间:2016年7月5日 下午7:57:17 类说明
    
     */
    
    public class HiveMySQl2HBaseMR extends Configured implements Tool {
    
    
    
        public static String tableName;
    
        public static String cf = null;
    
        public static String strRowkey = null;
    
        public static String strIndex = null;
    
        public static String column_name = null;
    
        public static String strColumn = null;
    
        private static Configuration conf = null;
    
        public static void main(String[] args) {
    
            getDatas();
    
            try {
    
                int runs = ToolRunner.run(new HiveMySQl2HBaseMR(), args);
    
                System.exit(runs);
    
            } catch (Exception e) {
    
                e.printStackTrace();
    
            }
    
        }
    
    
    
        // mysql读取获得cf、rowKey、cloumn, qual
    
        @SuppressWarnings("rawtypes")
    
        public static List<Map> getDatas() {
    
            // List<Map> listDatas = new ArrayList<Map>();
    
            String sql = "SELECT DISTINCT s.tableName, ar.rowkey,af.column_family,     aq.column_hive_index,   aq.column_name FROM "
    
                    + " archive_htable s,     archive_hrowkey ar,     archive_hfamily af,     archive_hqualifier aq WHERE "
    
                    + "    s.rowkey_id = ar.rowkey_id  AND ar.family_id = af.family_id    AND s.tableName = '2'";
    
            List<Map> selectDatas = JDBCUtils.selectDatas(sql);
    
            for (Map<String, String> metaData : selectDatas) {
    
                if (null == tableName) {
    
                    tableName = metaData.get("tableName");
    
                }
    
                if (null == cf) {
    
                    cf = metaData.get("column_family");
    
                }
    
                if (null == strRowkey) {
    
                    strRowkey = metaData.get("rowkey");
    
                }
    
    
    
                String strTempIndex = metaData.get("column_hive_index");
    
                String strTempName = metaData.get("column_name");
    
                if (null == strColumn
    
                        || (null != strColumn && "".equals(strColumn))) {
    
                    strColumn = strTempIndex + "    " + strTempName;
    
                } else {
    
                    strColumn = strColumn + "," + strTempIndex + "    " + strTempName;
    
                }
    
            }
    
            return selectDatas;
    
        }
    
    
    
        @SuppressWarnings("deprecation")
    
        public int run(String[] args) throws Exception {
    
            /*
    
             * if (args.length != 3){ System.err.println(
    
             * "Usage: HiveMySQl2HBaseMR <table_name><data_input_path><hfile_output_path>"
    
             * ); System.exit( -1 ); }
    
             */
    
            conf  = new Configuration();
    
            conf.addResource("hbase-site.xml");
    
    
    
            String table = "2";
    
            String input = "hdfs://node11:9000/datas/hivedata5";
    
            String output = "hdfs://node11:9000/datas/out1";
    
    
    
            HTable htable;
    
            try {
    
                // 运行前,删除已存在的中间输出目录
    
                try {
    
                    FileSystem fs = FileSystem.get(URI.create(output), conf);
    
                    fs.delete(new Path(output), true);
    
                    fs.close();
    
                } catch (IOException e1) {
    
                    e1.printStackTrace();
    
                }
    
    
    
                htable = new HTable(conf, table.getBytes());
    
                Job job = new Job(conf);
    
                job.setJobName("Generate HFile");
    
    
    
                job.setJarByClass(HiveMySQl2HBaseMR.class);
    
                job.setInputFormatClass(TextInputFormat.class);
    
                job.setMapperClass(HiveMySQlMapper.class);
    
                FileInputFormat.setInputPaths(job, input);
    
    
    
                job.getConfiguration().set("mapred.mapoutput.key.class",
    
                        "org.apache.hadoop.hbase.io.ImmutableBytesWritable");
    
                job.getConfiguration().set("mapred.mapoutput.value.class",
    
                        "org.apache.hadoop.hbase.KeyValue");
    
    
    
                FileOutputFormat.setOutputPath(job, new Path(output));
    
    
    
                HFileOutputFormat2.configureIncrementalLoad(job, htable);
    
    
    
                try {
    
                    job.waitForCompletion(true);
    
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
    
                } catch (ClassNotFoundException e) {
    
                    e.printStackTrace();
    
                }
    
            } catch (IOException e) {
    
                e.printStackTrace();
    
            }
    
            return 0;
    
        }
    
    
    
        public static class HiveMySQlMapper extends
    
                Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    
    
    
            @Override
    
            protected void setup(
    
                    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
    
                    throws IOException, InterruptedException {
    
                 super.setup( context );
    
                 conf = new Configuration();
    
            }
    
    
    
            String tableName = HiveMySQl2HBaseMR.tableName;
    
            String cf = HiveMySQl2HBaseMR.cf;
    
            String rowKey = HiveMySQl2HBaseMR.strRowkey;
    
            String strColumnName = HiveMySQl2HBaseMR.column_name;
    
            String strColumn = HiveMySQl2HBaseMR.strColumn;
    
            String split = "001";
    
    
    
            @Override
    
            protected void map(
    
                    LongWritable key,
    
                    Text value,
    
                    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
    
                    throws IOException, InterruptedException {
    
                // 将rowkey 是数字并且带有","的分隔符去掉,获得对应的数字
    
                // get rkMaps indexa
    
                String strRowKey = "";
    
                String[] datas = value.toString().split("	");
    
                
    
                for (String strIndex : rowKey.split(",")) {
    
                    if (null == (strRowKey) || (null != strRowKey)
    
                            && "".equals(strRowKey)) {
    
                        strRowKey = datas[Integer.valueOf(strIndex)];
    
                    } else {
    
                        strRowKey = strRowKey + split
    
                                + datas[Integer.valueOf(strIndex)];
    
                    }
    
                }
    
    
    
                for (String str : strColumn.split(",")) {
    
                    String[] columnTupe = str.split("	");
    
                    String columnData = datas[Integer.valueOf(columnTupe[0])];
    
                    String columnName = columnTupe[1];
    
                    System.out.println(columnData + "columnDatacolumnData");
    
    
    
                    ImmutableBytesWritable rk = new ImmutableBytesWritable(
    
                            Bytes.toBytes(rowKey));
    
                    // byte[] row, byte[] family, byte[] qualifier, byte[] value
    
                    KeyValue kv = new KeyValue(Bytes.toBytes(strRowKey), // "a01b01c01"
    
                            cf.getBytes(), Bytes.toBytes(columnName),
    
                            Bytes.toBytes(columnData));
    
                    context.write(rk, kv);
    
                }
    
            }
    
        }
    
    
    
    }

    JDBCUtils类:

    package cn.tansun.bd.utils;
    
    import java.io.ByteArrayInputStream;
    
    import java.io.IOException;
    
    import java.io.InputStream;
    
    import java.sql.Connection;
    
    import java.sql.DriverManager;
    
    import java.sql.ResultSet;
    
    import java.sql.ResultSetMetaData;
    
    import java.sql.SQLException;
    
    import java.sql.Statement;
    
    import java.util.ArrayList;
    
    import java.util.HashMap;
    
    import java.util.Iterator;
    
    import java.util.List;
    
    import java.util.Map;
    
    import java.util.Map.Entry;
    
    import java.util.Properties;
    
    import java.util.Set;
    
    
    
    /**
    
     * @author 作者 E-mail: zgl
    
     * @version 创建时间:2016年6月23日 下午4:25:03 类说明
    
     */
    
    public class JDBCUtils {
    
        public JDBCUtils()
    
        {
    
        }
    
    
    
        public static String PATH = "jdbc.properties";
    
    
    
        public static Properties prop;
    
    
    
        public static String url = null;
    
    
    
        public static String username = null;
    
    
    
        public static String password = null;
    
    
    
        public static Connection conn;
    
    
    
        public static Statement stmt;
    
    
    
        public static ResultSet rs;
    
    
    
        public static String fileName = null;
    
        static {
    
            try {
    
                InputStream inputStream = JDBCUtils.class.getClassLoader().getResourceAsStream( PATH );
    
                prop = new Properties();
    
                prop.load( inputStream );
    
                url = prop.getProperty( "jdbc.url" );
    
                username = prop.getProperty( "jdbc.username" );
    
                password = prop.getProperty( "jdbc.password" );
    
                if ( inputStream != null ) {
    
                    inputStream.close();
    
                }
    
            }
    
            catch ( IOException e ) {
    
    
    
                e.printStackTrace();
    
            }
    
        }
    
    
    
        public static void closeConnection( Connection conn ) {
    
            if ( conn != null ) {
    
                try {
    
                    conn.close();
    
                }
    
                catch ( SQLException e ) {
    
                    e.printStackTrace();
    
                }
    
            }
    
        }
    
    
    
        /**
    
         * 根据sql语句查询
    
         * 
    
         * @param sql
    
         * @return
    
         */
    
        @SuppressWarnings( "rawtypes" )
    
        public static List<Map> selectDatas( String sql ) {
    
            List<Map> listDatas = new ArrayList<Map>();
    
            try {
    
                conn = DriverManager.getConnection( url, username, password );
    
                conn.setAutoCommit( false );
    
                stmt =
    
                    conn.prepareStatement( "load data local infile '' " + "into table loadtest fields terminated by ','" );
    
                StringBuilder sb = new StringBuilder();
    
                InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
    
                ( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
    
                rs = stmt.executeQuery( sql );
    
                if ( rs != null ) {
    
                    ResultSetMetaData metaData = rs.getMetaData();
    
    
    
                    int count = metaData.getColumnCount();
    
                    Map<String, Object> map = null;
    
                    while ( rs.next() ) {
    
                        map = new HashMap<String, Object>();
    
                        for ( int i = 1; i < count + 1; i++ ) {
    
                            map.put( metaData.getColumnName( i ), rs.getObject( i ) );
    
                        }
    
                        listDatas.add( map );
    
                    }
    
    
    
                }
    
            }
    
            catch ( SQLException e ) {
    
                e.printStackTrace();
    
            }
    
            return listDatas;
    
        }
    
    
    
        /**
    
         * 
    
         * @param sql
    
         * @return
    
         */
    
        public static List<String>  getStrMap( String  sql) {
    
             List<String> strList = new ArrayList<String>();
    
             try {
    
                 conn = DriverManager.getConnection( url, username, password );
    
                 conn.setAutoCommit( false );
    
                 stmt =
    
                     conn.prepareStatement( "load data local infile '' " + "into table loadtest fields terminated by ','" );
    
                 StringBuilder sb = new StringBuilder();
    
                 InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
    
                 ( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
    
                 rs = stmt.executeQuery( sql );
    
                 if ( rs != null ) {
    
                     ResultSetMetaData metaData = rs.getMetaData();
    
                     int count = metaData.getColumnCount();
    
                     while (rs.next()){
    
                         
    
                     for (int i = 1; i < count + 1; i++){
    
                         //String str1 = metaData.getColumnName( i );
    
                         String str2 = (String) rs.getObject( i );
    
                         strList.add(str2);
    
                     }
    
                  }
    
                 }
    
             }
    
             catch ( SQLException e ) {
    
                 e.printStackTrace();
    
             }
    
            return strList;
    
        }
    
    
    
        public static String table_name = null;
    
    
    
        public static String rowkey = null;
    
    
    
        public static String column_family = null;
    
    
    
        public static String column_name = null;
    
    
    
        private static String rows = null;
    
        public static String sql = null;
    
        public static String sql2 = null;
    
    
    
        @SuppressWarnings( "rawtypes" )
    
        public static void main( String[] args ) {
    
             
    
    		sql2 = "SELECT   GROUP_CONCAT( DISTINCT aq.column_hive_index,'  ',   aq.column_name ,' '    ORDER BY   "
    
    				+ "    aq.column_hive_index SEPARATOR ','  ) AS column_names FROM  archive_hqualifier  aq "
    
    				+ "where aq.table_id = 77 GROUP BY   aq.column_name ORDER BY aq.column_hive_index";
    
    		sql ="SELECT DISTINCT 	s.tableName, 	ar.rowkey,	af.column_family,	"
    
    				+ "aq.column_name FROM	archive_htable s,archive_hrowkey ar,archive_hfamily af,"
    
    				+ " 	archive_hqualifier aq "
    
    				+ "WHERE s .rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id "
    
    				+ "AND af.qualifier_id = aq.qualifier_id;";     
    
    		String datas  = null;
    
    		List<String> strList = getStrMap(sql);
    
    		String substring  = null;
    
    		 for (int i = 0; i < strList.size(); i++){
    
    			datas = strList.get(i);
    
    			//datas = strList.get(i).substring(0,   strList.get(i).length()-1);
    
    			System.out.print(datas);
    
    		 }
        }
    }
    

      

  • 相关阅读:
    《从0开始学架构》——学习笔记(基础篇和高性能篇)
    Oracle的数据并发与一致性详解(下)
    Oracle的数据并发与一致性详解(上)
    关于oracle的缓冲区机制与HDFS中的edit logs的某些关联性的思考
    分布式锁的两种实现方式(基于redis和基于zookeeper)
    hadoop配置文件详解系列(二)-hdfs-site.xml篇
    hadoop配置文件详解系列(一)-core-site.xml篇
    【管理心得之四十六】你呀,少肺
    【管理心得之四十五】你呀,没心
    【管理心得之四十四】独立冲突之外,你做不到
  • 原文地址:https://www.cnblogs.com/zhanggl/p/5658517.html
Copyright © 2011-2022 走看看