一、需求1:对一张表的rowkey进行计数
官方HBase-Mapreduce 需求1:对一张表的rowkey进行计数 1)导入环境变量 export HBASE_HOME=/root/hd/hbase-1.3.0 export HADOOP_HOME=/root/hd/hadoop-2.8.4 export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` 可以添加到:hbase-env.sh 2)启动HBase-mr任务 cd /root/hd/hbase-1.3.0 /root/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.0.jar rowcounter emp
二、需求2:本地数据导入到HBase中
需求2:本地数据导入到HBase中 思路?HBase底层存储是hdfs,把数据先导入到hdfs HBase对应创建一张表 利用mr导入数据到表中 1)在hdfs中创建文件夹 导入本地数据 hdfs dfs -mkdir /lovein hdfs dfs -put /root/love.tsv /lovein 2)创建表 create 'love','info' 3)导入操作 cd /root/hd/hbase-1.3.0 /root/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.0.jar importtsv
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:description love hdfs://hd09-1:9000/lovein/
附:love.tsv
001 zhangsan henshuai 002 Dilireba beautiful 003 Yangmi good 004 isme perfect
三、需求3:将HBase中love表进行指定列的筛选然后倒入到lovemr表
自定义HBase-mr 需求3:将HBase中love表进行指定列的筛选然后倒入到lovemr表 1)构建Mapper类,读取love表中数据 2)构建Reducer类,将love表中数据写入到lovemr表中 3)构建driver驱动类 4) 打包 放入集群中运行这个任务 5)创建表 create 'lovemr','info' 6)导入操作 进入到HbaseTest-1.0-SNAPSHOT.jar包所在目录 /root/hd/hadoop-2.8.4/bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.hbase.mr.LoveDriver
1、ReadLoveMapper类
package com.hbase.mr; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class ReadLoveMapper extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //1.读取数据 拿到一个rowkey的数据 Put put = new Put(key.get()); //2.遍历column for (Cell c : value.rawCells()) { //3.加入列族数据 当前列族是info要 不是info列族的不要 是info数据才导入lovemr表中 if ("info".equals(Bytes.toString(CellUtil.cloneFamily(c)))){ //4.拿到指定列的数据 if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(c)))){ put.add(c); } } } context.write(key,put); } }
2、WriteLoveReducer类
package com.hbase.mr; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put p : values) { //遍历数据 context.write(NullWritable.get(),p); } } }
3、LoveDriver类
package com.hbase.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class LoveDriver implements Tool { private Configuration conf; //业务逻辑 public int run(String[] strings) throws Exception { //1.创建任务 Job job = Job.getInstance(conf); //2.指定运行的主类 job.setJarByClass(LoveDriver.class); //3.配置job 采用scan方式扫描表 Scan scan = new Scan(); //4.设置mapper类 TableMapReduceUtil.initTableMapperJob("love", scan, ReadLoveMapper.class, ImmutableBytesWritable.class, Put.class, job); //5.设置reducer类 TableMapReduceUtil.initTableReducerJob("lovemr", WriteLoveReducer.class, job); //设置reducerTask个数 job.setNumReduceTasks(1); boolean rs = job.waitForCompletion(true); return rs ? 0 : 1; } //设置配置 public void setConf(Configuration configuration) { this.conf = HBaseConfiguration.create(configuration); } //拿到配置 public Configuration getConf() { return this.conf; } public static void main(String[] args) { try { int status = ToolRunner.run(new LoveDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } } }
四、需求4:HDFS中的数据写入到HBase中
需求4:HDFS中的数据写入到HBase中 思路: 1)构建Mapper 来读取hdfs中的数据 2)构建Reducer 3)驱动类 4)打包运行 5)测试 6)在hdfs中创建文件夹 导入本地数据 hdfs dfs -mkdir /lovehbase hdfs dfs -put /root/love.tsv /lovehbase 7)创建表 create 'lovehdfs','info' 8)写入操作 进入到HbaseTest-1.0-SNAPSHOT.jar包所在目录 /root/hd/hadoop-2.8.4/bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.hbase.mr2.LoveDriver
1、ReadLoveFromHDFSMapper类
package com.hbase.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ReadLoveFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.读取数据 String line = value.toString(); //2.切分数据 String[] fields = line.split(" "); //3.封装数据 byte[] rowkey = Bytes.toBytes(fields[0]); byte[] name = Bytes.toBytes(fields[1]); byte[] desc = Bytes.toBytes(fields[2]); //封装put对象 Put put = new Put(rowkey); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),name); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("desc"),desc); //4.输出到reducer端 context.write(new ImmutableBytesWritable(rowkey),put); } }
2、WriteLoveReducer类
package com.hbase.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put p : values) { context.write(NullWritable.get(),p); } } }
3、LoveDriver类
package com.hbase.mr2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class LoveDriver implements Tool { private Configuration conf = null; public void setConf(Configuration configuration) { this.conf = HBaseConfiguration.create(); } public Configuration getConf() { return this.conf; } public int run(String[] strings) throws Exception { //1.创建job Job job = Job.getInstance(conf); job.setJarByClass(LoveDriver.class); //2.配置mapper job.setMapperClass(ReadLoveFromHDFSMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //3.配置reducer TableMapReduceUtil.initTableReducerJob("lovehdfs",WriteLoveReducer.class,job); //4.配置输入inputformat FileInputFormat.addInputPath(job,new Path("/lovehbase/")); //5.输出 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { try { int status = ToolRunner.run(new LoveDriver(), args); } catch (Exception e) { e.printStackTrace(); } } }