附录代码:
HBase---->HDFS
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.hbase.HBaseConfiguration; 6 import org.apache.hadoop.hbase.client.Result; 7 import org.apache.hadoop.hbase.client.Scan; 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 10 import org.apache.hadoop.hbase.mapreduce.TableMapper; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 17 public class HBase2HDFS { 18 19 public static void main(String[] args) throws Exception { 20 Configuration conf = HBaseConfiguration.create(); 21 Job job = Job.getInstance(conf, HBase2HDFS.class.getSimpleName()); 22 job.setJarByClass(HBase2HDFS.class); 23 //MR有输入和输出,输入一般是FileInputFormat等...但是在HBase中需要用到一个特殊的工具类是TableMapReduceUtil 24 TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), HBase2HDFSMapper.class, 25 Text.class, Text.class, job); 26 //HBase中的具体操作打到MR的job中. 27 TableMapReduceUtil.addDependencyJars(job); 28 job.setMapperClass(HBase2HDFSMapper.class); 29 job.setMapOutputKeyClass(Text.class); 30 job.setMapOutputValueClass(Text.class); 31 job.setOutputFormatClass(TextOutputFormat.class); 32 FileOutputFormat.setOutputPath(job, new Path(args[1])); 33 //FileOutputFormat.setOutputPath(job, new Path("/t1-out")); 34 job.setNumReduceTasks(0); 35 job.waitForCompletion(true); 36 37 38 } 39 static class HBase2HDFSMapper extends TableMapper<Text, Text>{ 40 private Text rowKeyText = new Text(); 41 private Text value = new Text(); 42 43 //这个TableMapper中的两个泛型是Map阶段的输出..HBase中的数据要想进入HBase,几乎都用引号引起来. 44 //TableMapper是Mapper类的一个子类.这个类用来定义前面的两个泛型参数. 45 @Override 46 protected void map( 47 ImmutableBytesWritable key, 48 Result result, 49 Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) 50 throws IOException, InterruptedException { 51 //结果都在result对象,用raw方法从result对象中找到数据. 这个raw()方法已经过时了. 52 /* 53 KeyValue[] raw = result.raw(); 54 for (KeyValue keyValue : raw) { 55 keyValue.getValue(); 56 } 57 */ 58 /* 59 * 想输出的数据格式如下: 1 zhangsan 13 (行键,name,age) 60 * 2 lisi 14 61 */ 62 63 //要想精确的获得某一列的值,要根据行键,列族,列的时间戳. 64 //getColumnLatestCell 是获得最新的时间戳的值 相当于时间戳已经定义好了. 65 byte[] nameBytes = result.getColumnLatestCell("cf".getBytes(), "name".getBytes()).getValue(); 66 byte[] ageBytes = result.getColumnLatestCell("cf".getBytes(), "age".getBytes()).getValue(); 67 68 rowKeyText.set(key.get()); 69 value.set(new String(nameBytes) + " " + new String(ageBytes)); 70 context.write(new Text(key.get()), value); 71 //这里已经把数据搞成了 1 name age 的形式....就不需要写Reduce 72 } 73 } 74 }
HDFS---->HBase 通过MR导入到HBase
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.hbase.HBaseConfiguration; 5 import org.apache.hadoop.hbase.client.Mutation; 6 import org.apache.hadoop.hbase.client.Put; 7 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 9 import org.apache.hadoop.hbase.mapreduce.TableReducer; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 19 public class HDFS2HBaseImport { 20 21 public static void main(String[] args) throws Exception { 22 Configuration conf = HBaseConfiguration.create(); 23 conf.set(TableOutputFormat.OUTPUT_TABLE, args[0]); 24 25 Job job = Job.getInstance(conf, HDFS2HBaseImport.class.getSimpleName()); 26 job.setJarByClass(HDFS2HBaseImport.class); 27 28 //数据到底放到哪一张表中,还是要用到TableMapReduceUtil类. 29 TableMapReduceUtil.addDependencyJars(job); 30 job.setMapperClass(HDFS2HBaseMapper.class); 31 job.setMapOutputKeyClass(Text.class); 32 job.setMapOutputValueClass(Text.class); 33 job.setOutputFormatClass(TextOutputFormat.class); 34 job.setReducerClass(HDFS2HBaseReducer.class); 35 job.setOutputFormatClass(TableOutputFormat.class); 36 FileInputFormat.setInputPaths(job, args[1]); 37 job.waitForCompletion(true); 38 } 39 40 static class HDFS2HBaseMapper extends Mapper<LongWritable, Text, Text, Text>{ 41 private Text rowKeyText = new Text(); 42 private Text value = new Text(); 43 44 @Override 45 protected void map(LongWritable key, Text text, 46 Mapper<LongWritable, Text, Text, Text>.Context context) 47 throws IOException, InterruptedException { 48 String[] splits = text.toString().split(" "); 49 rowKeyText.set(splits[0]); 50 value.set(splits[1] + " " + splits[2]);//name age 51 context.write(rowKeyText, value); 52 } 53 } 54 //Reduce继承的是和在导出的时候Map extends TableMapper 对应的 因为导入的是HBase中,所以后面的参数用NullWritable代替 55 static class HDFS2HBaseReducer extends TableReducer<Text, Text, NullWritable> { 56 @Override 57 protected void reduce(Text k2, Iterable<Text> v2s, 58 Reducer<Text, Text, NullWritable, Mutation>.Context context) 59 throws IOException, InterruptedException { 60 //向HBase中插入数据一定要用到Put对象. 61 Put put = new Put(k2.getBytes()); 62 63 for (Text text : v2s) { 64 String[] splits = text.toString().split(" "); 65 //加载列和对应的值 66 put.add("cf".getBytes(), "name".getBytes(), splits[0].getBytes()); 67 put.add("cf".getBytes(), "age".getBytes(), splits[1].getBytes()); 68 context.write(NullWritable.get(), put);//一个参数是key,一个是对应的value. 69 //导入HBase不需要key...直接用NullWritable对象和封装好数据的put对象. 70 } 71 } 72 } 73 }