我们知道,hbase没有像关系型的数据库拥有强大的查询功能和统计功能,本文实现了如何利用mapreduce来统计hbase中单元值出现的个数,并将结果携带目标的表中,
(1)mapper的实现
package com.datacenter.HbaseMapReduce.Summary; import java.io.IOException; import java.util.NavigableMap; import java.util.Map.Entry; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SummaryMapper extends TableMapper<Text, IntWritable> { // 这里是指定map中context输出的类型 public static final byte[] CF = "cf".getBytes(); public static final byte[] ATTR1 = "attr1".getBytes(); private final IntWritable ONE = new IntWritable(1); private Text text = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub /* byte[] ss = value.getValue(CF, ATTR1); // 这里是只是获取特定的列族,特定列的值的个数,也可以根据实际的情况修改 String val = new String(ss); text.set(val); // we can only emit Writables.. context.write(text, ONE);*/ //统计所有的列族和列的值的个数 try { DealResult( value , context); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } // 统计所有列族和列的值的个数 public void DealResult(Result rs ,Context context) throws Exception { if (rs.isEmpty()) { System.out.println("result is empty!"); return; } NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> tableResulrt = rs .getMap(); String rowkey = Bytes.toString(rs.getRow()); // actain rowkey ///System.out.println("rowkey->" + rowkey); for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> familyResult : tableResulrt .entrySet()) { //System.out.print(" family->" + Bytes.toString(temp.getKey())); for (Entry<byte[], NavigableMap<Long, byte[]>> columnResult : familyResult .getValue().entrySet()) { ///System.out.print(" col->" + Bytes.toString(value.getKey())); for (Entry<Long, byte[]> valueResult : columnResult.getValue().entrySet()) { //System.out.print(" vesion->" + va.getKey()); //System.out.print(" value->"+ Bytes.toString(va.getValue())); //System.out.println(); text.set(new String(valueResult.getValue())); context.write(text, ONE); } } } } }
(2)reduce的实现
package com.datacenter.HbaseMapReduce.Summary; import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SummaryReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public static final byte[] CF = "cf".getBytes(); public static final byte[] COUNT = "count".getBytes(); @SuppressWarnings("deprecation") @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int i = 0; for (IntWritable val : values) { i += val.get(); } Put put = new Put(Bytes.toBytes(key.toString())); //Cell s=new put.add(CF, COUNT, 100,Bytes.toBytes(i)); //在对应的列族中增加一列count,记录其个数 context.write(null, put); } }
(3)主类加载信息的实现
package com.datacenter.HbaseMapReduce.Summary; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; //统计hbase表中,每行的值在整个表的个数 public class SummaryMain { static String rootdir = "hdfs://hadoop3:8020/hbase"; static String zkServer = "hadoop3"; static String port = "2181"; private static Configuration conf; private static HConnection hConn = null; public static void HbaseUtil(String rootDir, String zkServer, String port) { conf = HBaseConfiguration.create();// 获取默认配置信息 conf.set("hbase.rootdir", rootDir); conf.set("hbase.zookeeper.quorum", zkServer); conf.set("hbase.zookeeper.property.clientPort", port); try { hConn = HConnectionManager.createConnection(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub HbaseUtil(rootdir, zkServer, port); Job job = new Job(conf, "ExampleSummary"); job.setJarByClass(SummaryMain.class); // class that contains mapper and // reducer Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for // MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob("score", // input table scan, // Scan instance to control CF and attribute selection SummaryMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob("test", // output table SummaryReducer.class, // reducer class job); job.setNumReduceTasks(1); // at least one, adjust as required boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }