Runner类
实现将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。
package com.yjsj.hbase_mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; class Fruit2FruitMRRunner extends Configured implements Tool { //组装 Job public int run(String[] args) throws Exception { //得到 Configuration Configuration conf = this.getConf(); //创建 Job 任务 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Fruit2FruitMRRunner.class); //配置 Job Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500); //设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者是老版本 TableMapReduceUtil.initTableMapperJob( "fruit", //数据源的表名 scan, //scan 扫描控制器 ReadFruitMapper.class,//设置 Mapper 类 ImmutableBytesWritable.class,//设置 Mapper 输出 key 类型 Put.class,//设置 Mapper 输出 value 值类型 job);//设置给哪个 JOB //设置 Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job); //设置 Reduce 数量,最少 1 个 job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); if (!isSuccess) { throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf ; conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master,node1,node2"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.master", "master:60000"); int status = ToolRunner.run(conf, (Tool) new Fruit2FruitMRRunner(), args); System.exit(status); } }
Mapper类
1 package com.yjsj.hbase_mr; 2 3 import java.io.IOException; 4 import org.apache.hadoop.hbase.Cell; 5 import org.apache.hadoop.hbase.CellUtil; 6 import org.apache.hadoop.hbase.client.Put; 7 import org.apache.hadoop.hbase.client.Result; 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 9 import org.apache.hadoop.hbase.mapreduce.TableMapper; 10 import org.apache.hadoop.hbase.util.Bytes; 11 12 public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> { 13 @Override 14 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { 15 //将 fruit 的 name 和 color 提取出来,相当于将每一行数据读取出来放入到 Put 对象中。 16 Put put = new Put(key.get()); 17 //遍历添加 column 行 18 for (Cell cell:value.rawCells()) { 19 //添加/克隆列族:info 20 if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ 21 //添加/克隆列:name 22 if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ 23 //将该列 cell 加入到 put 对象中 24 put.add(cell); 25 //添加/克隆列:color 26 }else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { 27 //向该列 cell 加入到 put 对象中 28 put.add(cell); 29 } 30 } 31 } 32 //将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出 33 context.write(key,put); 34 } 35 }
Reduce类
package com.yjsj.hbase_mr; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //读出来的每一行数据写入到 fruit_mr 表中 for (Put put : values) { context.write(NullWritable.get(), put); } } }