项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-mr
通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。
1. 官方HBase-MapReduce
-
查看HBase的MapReduce任务的执行
$ bin/hbase mapredcp
-
环境变量的导入
- 执行环境变量的导入(临时生效,在命令行执行下述操作)
$ export HBASE_HOME=/opt/module/hbase-1.3.1 $ export HADOOP_HOME=/opt/module/hadoop-2.7.2 $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp
- 永久生效:在/etc/profile配置
export HBASE_HOME=/opt/module/hbase-1.3.1 export HADOOP_HOME=/opt/module/hadoop-2.7.2
并在hadoop-env.sh中配置:(注意:在for循环之后配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
-
运行官方的MapReduce任务
案例一:统计Student表中有多少行数据
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
案例二:使用MapReduce将本地数据导入到HBase
-
在本地创建一个tsv格式的文件:fruit.tsv
1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow
-
创建HBase表
hbase(main):001:0> create 'fruit','info'
-
在HDFS中创建input_fruit文件夹并上传fruit.tsv文件
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/ $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
-
执行MapReduce到HBase的fruit表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://hadoop102:9000/input_fruit # cdh版本命令 $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/hbase-server-1.2.0-cdh5.16.2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://cm1.cdh.com:8020/input_fruit
-
使用scan命令查看导入后的结果
hbase(main):001:0> scan 'fruit'
-
2.cdh环境下HBase和MapReduce的集成
-
配置环境变量(每台机器都要配置)
vim /etc/profile export HBASE_HOME=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop source /etc/profile
-
修改hadoop-env.sh
vim /etc/hadoop/conf/hadoop-env.sh # 每台机器均加入 export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/* source /etc/hadoop/conf/hadoop-env.sh
-
测试
先重启一下集群,再输入以下命令
cd /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8 /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/hbase-server-1.2.0-cdh5.16.2.jar rowcounter student
3. 自定义HBase-MapReduce1
目标:使用MapReduce将hdfs本地数据导入到HBase表中
分步实现:
-
构建FruitMapper类,用于读取本地数据
package com.cw.bigdata.mr1; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
-
构建FruitReducer类,用于将读取到的数据写入到hbase中的fruit1表中
package com.cw.bigdata.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 1.遍历values:1001 Apple Red for (Text value : values) { // 2.获取每一行数据 String[] fields = value.toString().split(" "); // 3.构建Put对象 Put put = new Put(Bytes.toBytes(fields[0])); // 4.给Put对象赋值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(fields[2])); // 5.写出 context.write(NullWritable.get(), put); } } }
-
构建FruitDriver implements Tool用于组装运行Job任务
package com.cw.bigdata.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FruitDriver implements Tool { // 定义一个Configuration private Configuration configuration = null; public int run(String[] args) throws Exception { // 1.获取Job对象 Job job = Job.getInstance(configuration); // 2.设置驱动类路径 job.setJarByClass(FruitDriver.class); // 3.设置Mapper和Mapper输出的KV类型 job.setMapperClass(FruitMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // 4.设置Reducer类 TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job); // 5.设置输入输出参数 FileInputFormat.setInputPaths(job, new Path(args[0])); // 6.提交任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new FruitDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
-
打包运行任务
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.cw.bigdata.mr1.FruitDriver /input_fruit/fruit.tsv fruit1 # CDH版命令 # fruit1为HBase中的表名 /input_fruit/fruit.tsv为hdfs上要上传的数据 $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /root/jars/hbaseapi-1.0.jar com.cw.bigdata.mr1.FruitDriver /input_fruit/fruit.tsv fruit1
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
4. 自定义HBase-MapReduce2
目标:将HBase中fruit1表中的一部分数据(name列),通过MR迁入到HBase的fruit2表中。
-
构建Fruit2Mapper类,用于读取fruit1表中的数据
package com.cw.bigdata.mr2; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 构建Put对象 Put put = new Put(key.get()); // 1.获取数据 for (Cell cell : value.rawCells()) { // 2.判断当前的cell是否为"name"列 if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { // 3.给Put对象赋值 put.add(cell); } } // 4.写出 context.write(key, put); } }
-
构建Fruit2Reducer类,用于将读取到的fruit1表中的数据写入到fruit2表中
package com.cw.bigdata.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { // 遍历写出 for (Put put : values) { context.write(NullWritable.get(), put); } } }
-
构建Fruit2Driver implements Tool用于组装运行Job任务
package com.cw.bigdata.mr2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Fruit2Driver implements Tool { // 定义配置信息 private Configuration configuration = null; public int run(String[] args) throws Exception { // 1.获取Job对象 Job job = Job.getInstance(configuration); // 2.设置主类路径 job.setJarByClass(Fruit2Driver.class); // 3.设置Mapper和输出KV类型 TableMapReduceUtil.initTableMapperJob("fruit1", new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job); // 4.设置Reducer&输出的表 TableMapReduceUtil.initTableReducerJob("fruit2", Fruit2Reducer.class, job); // 5.提交任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = HBaseConfiguration.create(); ToolRunner.run(configuration, new Fruit2Driver(), args); } catch (Exception e) { e.printStackTrace(); } } }
-
打包运行任务
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.cw.bigdata.mr2.FruitDriver # CDH版命令 # fruit1为HBase中的表名 /input_fruit/fruit.tsv为hdfs上要上传的数据 $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /root/jars/hbaseapi-1.0.jar com.cw.bigdata.mr2.FruitDriver
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。