功能:把hdfs上的数据写入到hbase表。
hadoop的mapreduce输出要导入到hbase表,最好先输出HFile格式,再导入hbase,因为HFile是hbase的内部存储格式,所以导入效率很高,下面我们来看一下具体怎么做。
1、我们在hdfs上有一个文本文件:
2、在hbase表里我们创建一个t1表
创建语句:create 't1','cf'
3、写MR作业
1 package cn.tendency.wenzhouhbase.hadoop; 2 3 import java.io.IOException; 4 import java.text.SimpleDateFormat; 5 import java.util.Calendar; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Mutation; 8 import org.apache.hadoop.hbase.client.Put; 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 10 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 11 import org.apache.hadoop.hbase.mapreduce.TableReducer; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.NullWritable; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.mapreduce.Job; 16 import org.apache.hadoop.mapreduce.Mapper; 17 import org.apache.hadoop.mapreduce.Reducer; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 20 21 public class Hadoop2Hbase { 22 23 @SuppressWarnings("deprecation") 24 public static void main(String[] args) throws Exception { 25 Configuration conf = new Configuration(); 26 conf.set("hbase.zookeeper.quorum", "192.168.1.124,192.168.1.125,192.168.1.126"); 27 conf.set("hbase.zookeeper.property.clientPort", "2181"); 28 conf.set("hbase.master.port", "60000"); 29 conf.set("hbase.rootdir", "hdfs://192.168.1.122:9000/hbase"); 30 conf.set(TableOutputFormat.OUTPUT_TABLE, "t1"); 31 32 Job job = new Job(conf, Hadoop2Hbase.class.getSimpleName()); 33 TableMapReduceUtil.addDependencyJars(job); 34 job.setJarByClass(Hadoop2Hbase.class); 35 36 job.setMapperClass(HbaseMapper.class); 37 job.setReducerClass(HbaseReducer.class); 38 39 job.setMapOutputKeyClass(LongWritable.class); 40 job.setMapOutputValueClass(Text.class); 41 42 job.setInputFormatClass(TextInputFormat.class); 43 job.setOutputFormatClass(TableOutputFormat.class); 44 45 FileInputFormat.setInputPaths(job, "hdfs://192.168.1.123:9000/mytest/*"); 46 job.waitForCompletion(true); 47 } 48 49 static class HbaseMapper extends 50 Mapper<LongWritable, Text, LongWritable, Text> { 51 @Override 52 protected void map(LongWritable key, Text value, 53 Mapper<LongWritable, Text, LongWritable, Text>.Context context) 54 throws IOException, InterruptedException { 55 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); 56 String[] split = value.toString().split(" "); 57 context.write( 58 key, 59 new Text(split[0]+sdf.format(Calendar.getInstance().getTime()) 60 + " " + value.toString())); 61 } 62 } 63 64 static class HbaseReducer extends 65 TableReducer<LongWritable, Text, NullWritable> { 66 @Override 67 protected void reduce( 68 LongWritable key, 69 Iterable<Text> values, 70 Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) 71 throws IOException, InterruptedException { 72 for (Text text : values) { 73 String[] split = text.toString().split(" "); 74 Put put = new Put(split[0].getBytes()); 75 put.addColumn("cf".getBytes(), "oneColumn".getBytes(), text 76 .toString().getBytes()); 77 put.addColumn("cf".getBytes(), "id".getBytes(), 78 split[1].getBytes()); 79 put.addColumn("cf".getBytes(), "name".getBytes(), 80 split[2].getBytes()); 81 put.addColumn("cf".getBytes(), "age".getBytes(), 82 split[3].getBytes()); 83 // put.addColumn("cf".getBytes(), "addr".getBytes(), 84 // split[4].getBytes()); 85 context.write(NullWritable.get(), put); 86 } 87 } 88 } 89 }