package cn.tansun.bd.hbase; import java.io.IOException; import java.net.URI; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.tansun.bd.utils.JDBCUtils; /** * @author 作者 E-mail: zgl * @version 创建时间:2016年7月5日 下午7:57:17 类说明 */ public class HiveMySQl2HBaseMR extends Configured implements Tool { public static String tableName; public static String cf = null; public static String strRowkey = null; public static String strIndex = null; public static String column_name = null; public static String strColumn = null; private static Configuration conf = null; public static void main(String[] args) { getDatas(); try { int runs = ToolRunner.run(new HiveMySQl2HBaseMR(), args); System.exit(runs); } catch (Exception e) { e.printStackTrace(); } } // mysql读取获得cf、rowKey、cloumn, qual @SuppressWarnings("rawtypes") public static List<Map> getDatas() { // List<Map> listDatas = new ArrayList<Map>(); String sql = "SELECT DISTINCT s.tableName, ar.rowkey,af.column_family, aq.column_hive_index, aq.column_name FROM " + " archive_htable s, archive_hrowkey ar, archive_hfamily af, archive_hqualifier aq WHERE " + " s.rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id AND s.tableName = '2'"; List<Map> selectDatas = JDBCUtils.selectDatas(sql); for (Map<String, String> metaData : selectDatas) { if (null == tableName) { tableName = metaData.get("tableName"); } if (null == cf) { cf = metaData.get("column_family"); } if (null == strRowkey) { strRowkey = metaData.get("rowkey"); } String strTempIndex = metaData.get("column_hive_index"); String strTempName = metaData.get("column_name"); if (null == strColumn || (null != strColumn && "".equals(strColumn))) { strColumn = strTempIndex + " " + strTempName; } else { strColumn = strColumn + "," + strTempIndex + " " + strTempName; } } return selectDatas; } @SuppressWarnings("deprecation") public int run(String[] args) throws Exception { /* * if (args.length != 3){ System.err.println( * "Usage: HiveMySQl2HBaseMR <table_name><data_input_path><hfile_output_path>" * ); System.exit( -1 ); } */ conf = new Configuration(); conf.addResource("hbase-site.xml"); String table = "2"; String input = "hdfs://node11:9000/datas/hivedata5"; String output = "hdfs://node11:9000/datas/out1"; HTable htable; try { // 运行前,删除已存在的中间输出目录 try { FileSystem fs = FileSystem.get(URI.create(output), conf); fs.delete(new Path(output), true); fs.close(); } catch (IOException e1) { e1.printStackTrace(); } htable = new HTable(conf, table.getBytes()); Job job = new Job(conf); job.setJobName("Generate HFile"); job.setJarByClass(HiveMySQl2HBaseMR.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(HiveMySQlMapper.class); FileInputFormat.setInputPaths(job, input); job.getConfiguration().set("mapred.mapoutput.key.class", "org.apache.hadoop.hbase.io.ImmutableBytesWritable"); job.getConfiguration().set("mapred.mapoutput.value.class", "org.apache.hadoop.hbase.KeyValue"); FileOutputFormat.setOutputPath(job, new Path(output)); HFileOutputFormat2.configureIncrementalLoad(job, htable); try { job.waitForCompletion(true); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } return 0; } public static class HiveMySQlMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { @Override protected void setup( Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException { super.setup( context ); conf = new Configuration(); } String tableName = HiveMySQl2HBaseMR.tableName; String cf = HiveMySQl2HBaseMR.cf; String rowKey = HiveMySQl2HBaseMR.strRowkey; String strColumnName = HiveMySQl2HBaseMR.column_name; String strColumn = HiveMySQl2HBaseMR.strColumn; String split = "001"; @Override protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException { // 将rowkey 是数字并且带有","的分隔符去掉,获得对应的数字 // get rkMaps indexa String strRowKey = ""; String[] datas = value.toString().split(" "); for (String strIndex : rowKey.split(",")) { if (null == (strRowKey) || (null != strRowKey) && "".equals(strRowKey)) { strRowKey = datas[Integer.valueOf(strIndex)]; } else { strRowKey = strRowKey + split + datas[Integer.valueOf(strIndex)]; } } for (String str : strColumn.split(",")) { String[] columnTupe = str.split(" "); String columnData = datas[Integer.valueOf(columnTupe[0])]; String columnName = columnTupe[1]; System.out.println(columnData + "columnDatacolumnData"); ImmutableBytesWritable rk = new ImmutableBytesWritable( Bytes.toBytes(rowKey)); // byte[] row, byte[] family, byte[] qualifier, byte[] value KeyValue kv = new KeyValue(Bytes.toBytes(strRowKey), // "a 01b 01c 01" cf.getBytes(), Bytes.toBytes(columnName), Bytes.toBytes(columnData)); context.write(rk, kv); } } } }
JDBCUtils类:
package cn.tansun.bd.utils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
/**
* @author 作者 E-mail: zgl
* @version 创建时间:2016年6月23日 下午4:25:03 类说明
*/
public class JDBCUtils {
public JDBCUtils()
{
}
public static String PATH = "jdbc.properties";
public static Properties prop;
public static String url = null;
public static String username = null;
public static String password = null;
public static Connection conn;
public static Statement stmt;
public static ResultSet rs;
public static String fileName = null;
static {
try {
InputStream inputStream = JDBCUtils.class.getClassLoader().getResourceAsStream( PATH );
prop = new Properties();
prop.load( inputStream );
url = prop.getProperty( "jdbc.url" );
username = prop.getProperty( "jdbc.username" );
password = prop.getProperty( "jdbc.password" );
if ( inputStream != null ) {
inputStream.close();
}
}
catch ( IOException e ) {
e.printStackTrace();
}
}
public static void closeConnection( Connection conn ) {
if ( conn != null ) {
try {
conn.close();
}
catch ( SQLException e ) {
e.printStackTrace();
}
}
}
/**
* 根据sql语句查询
*
* @param sql
* @return
*/
@SuppressWarnings( "rawtypes" )
public static List<Map> selectDatas( String sql ) {
List<Map> listDatas = new ArrayList<Map>();
try {
conn = DriverManager.getConnection( url, username, password );
conn.setAutoCommit( false );
stmt =
conn.prepareStatement( "load data local infile '' " + "into table loadtest fields terminated by ','" );
StringBuilder sb = new StringBuilder();
InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
rs = stmt.executeQuery( sql );
if ( rs != null ) {
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
Map<String, Object> map = null;
while ( rs.next() ) {
map = new HashMap<String, Object>();
for ( int i = 1; i < count + 1; i++ ) {
map.put( metaData.getColumnName( i ), rs.getObject( i ) );
}
listDatas.add( map );
}
}
}
catch ( SQLException e ) {
e.printStackTrace();
}
return listDatas;
}
/**
*
* @param sql
* @return
*/
public static List<String> getStrMap( String sql) {
List<String> strList = new ArrayList<String>();
try {
conn = DriverManager.getConnection( url, username, password );
conn.setAutoCommit( false );
stmt =
conn.prepareStatement( "load data local infile '' " + "into table loadtest fields terminated by ','" );
StringBuilder sb = new StringBuilder();
InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );
( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );
rs = stmt.executeQuery( sql );
if ( rs != null ) {
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
while (rs.next()){
for (int i = 1; i < count + 1; i++){
//String str1 = metaData.getColumnName( i );
String str2 = (String) rs.getObject( i );
strList.add(str2);
}
}
}
}
catch ( SQLException e ) {
e.printStackTrace();
}
return strList;
}
public static String table_name = null;
public static String rowkey = null;
public static String column_family = null;
public static String column_name = null;
private static String rows = null;
public static String sql = null;
public static String sql2 = null;
@SuppressWarnings( "rawtypes" )
public static void main( String[] args ) {
sql2 = "SELECT GROUP_CONCAT( DISTINCT aq.column_hive_index,' ', aq.column_name ,' ' ORDER BY "
+ " aq.column_hive_index SEPARATOR ',' ) AS column_names FROM archive_hqualifier aq "
+ "where aq.table_id = 77 GROUP BY aq.column_name ORDER BY aq.column_hive_index";
sql ="SELECT DISTINCT s.tableName, ar.rowkey, af.column_family, "
+ "aq.column_name FROM archive_htable s,archive_hrowkey ar,archive_hfamily af,"
+ " archive_hqualifier aq "
+ "WHERE s .rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id "
+ "AND af.qualifier_id = aq.qualifier_id;";
String datas = null;
List<String> strList = getStrMap(sql);
String substring = null;
for (int i = 0; i < strList.size(); i++){
datas = strList.get(i);
//datas = strList.get(i).substring(0, strList.get(i).length()-1);
System.out.print(datas);
}
}
}