1.实现方法
Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。
2.准备
首先在hdfs上面上传一段测试文本,本程序测试文本保存在/user/root/test目录下
在hbase中建立一个表,拥有一个列族就可以
3.实现代码
mapper的代码
package com.bjsxt.wc; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strs = value.toString().split(" "); for (String string : strs) { context.write(new Text(string), new IntWritable(1)); } } }
TableReducer类的实现
package com.bjsxt.wc; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class WCReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { @Override protected void reduce(Text text, Iterable<IntWritable> iterable, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable it : iterable) { sum += it.get(); } Put put = new Put(text.toString().getBytes()); put.add("cf".getBytes(), "ct".getBytes(), (sum + "").getBytes()); context.write(null, put); } }
主程实现(注意reduce配置时,使用TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job, null, null, null, null, false);)
package com.bjsxt.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WCRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://node01:8020"); conf.set("hbase.zookeeper.quorum", "node02,node03,node04"); Job job = Job.getInstance(conf); job.setJarByClass(WCRunner.class); // 指定mapper 和 reducer job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 最后一个参数设置false // TableMapReduceUtil.initTableReducerJob(table, reducer, job); TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job, null, null, null, null, false); FileInputFormat.addInputPath(job, new Path("/user/root/test.txt")); job.waitForCompletion(true); } }