zoukankan      html  css  js  c++  java
  • HBase with MapReduce (Summary)

    我们知道,hbase没有像关系型的数据库拥有强大的查询功能和统计功能,本文实现了如何利用mapreduce来统计hbase中单元值出现的个数,并将结果携带目标的表中,

    (1)mapper的实现

    package com.datacenter.HbaseMapReduce.Summary;
    
    import java.io.IOException;
    import java.util.NavigableMap;
    import java.util.Map.Entry;
    
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SummaryMapper extends TableMapper<Text, IntWritable> { // 这里是指定map中context输出的类型
    
    	public static final byte[] CF = "cf".getBytes();
    	public static final byte[] ATTR1 = "attr1".getBytes();
    
    	private final IntWritable ONE = new IntWritable(1);
    	private Text text = new Text();
    
    	@Override
    	protected void map(ImmutableBytesWritable key, Result value, Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    
    /*		byte[] ss = value.getValue(CF, ATTR1); // 这里是只是获取特定的列族,特定列的值的个数,也可以根据实际的情况修改
    		String val = new String(ss);
    		text.set(val); // we can only emit Writables..
    		context.write(text, ONE);*/
    
    		//统计所有的列族和列的值的个数
    		try {
    			DealResult( value , context);
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	// 统计所有列族和列的值的个数
    	public void DealResult(Result rs ,Context context) throws Exception {
    
    		if (rs.isEmpty()) {
    			System.out.println("result is empty!");
    			return;
    		}
    
    		NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> tableResulrt = rs
    				.getMap();
    		String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
    		///System.out.println("rowkey->" + rowkey);
    		for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> familyResult : tableResulrt
    				.entrySet()) {
    			//System.out.print("	family->" + Bytes.toString(temp.getKey()));
    			for (Entry<byte[], NavigableMap<Long, byte[]>> columnResult : familyResult
    					.getValue().entrySet()) {
    				///System.out.print("	col->" + Bytes.toString(value.getKey()));
    				for (Entry<Long, byte[]> valueResult : columnResult.getValue().entrySet()) {
    					//System.out.print("	vesion->" + va.getKey());
    					//System.out.print("	value->"+ Bytes.toString(va.getValue()));
    					//System.out.println();
    					text.set(new String(valueResult.getValue()));
    					context.write(text, ONE);
    				}
    			}
    		}
    	}
    
    }
    

     (2)reduce的实现

    package com.datacenter.HbaseMapReduce.Summary;
    
    import java.io.IOException;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SummaryReducer extends
    		TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    
    	public static final byte[] CF = "cf".getBytes();
    	public static final byte[] COUNT = "count".getBytes();
    
    	@SuppressWarnings("deprecation")
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		int i = 0;
    		for (IntWritable val : values) {
    			i += val.get();
    		}
    		Put put = new Put(Bytes.toBytes(key.toString()));
    		//Cell s=new 
    		put.add(CF, COUNT, 100,Bytes.toBytes(i));  //在对应的列族中增加一列count,记录其个数
    		
    		
    		context.write(null, put);
    	}
    
    }
    

     (3)主类加载信息的实现

    package com.datacenter.HbaseMapReduce.Summary;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    
    //统计hbase表中,每行的值在整个表的个数
    
    public class SummaryMain {
    	static String rootdir = "hdfs://hadoop3:8020/hbase";
    	static String zkServer = "hadoop3";
    	static String port = "2181";
    
    	private static Configuration conf;
    	private static HConnection hConn = null;
    
    	public static void HbaseUtil(String rootDir, String zkServer, String port) {
    
    		conf = HBaseConfiguration.create();// 获取默认配置信息
    		conf.set("hbase.rootdir", rootDir);
    		conf.set("hbase.zookeeper.quorum", zkServer);
    		conf.set("hbase.zookeeper.property.clientPort", port);
    
    		try {
    			hConn = HConnectionManager.createConnection(conf);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		// TODO Auto-generated method stub
    		HbaseUtil(rootdir, zkServer, port);
    
    		Job job = new Job(conf, "ExampleSummary");
    		job.setJarByClass(SummaryMain.class); // class that contains mapper and
    												// reducer
    
    		Scan scan = new Scan();
    		scan.setCaching(500); // 1 is the default in Scan, which will be bad for
    								// MapReduce jobs
    		scan.setCacheBlocks(false); // don't set to true for MR jobs
    		// set other scan attrs
    
    		TableMapReduceUtil.initTableMapperJob("score", // input table
    				scan, // Scan instance to control CF and attribute selection
    				SummaryMapper.class, // mapper class
    				Text.class, // mapper output key
    				IntWritable.class, // mapper output value
    				job);
    		TableMapReduceUtil.initTableReducerJob("test", // output table
    				SummaryReducer.class, // reducer class
    				job);
    		job.setNumReduceTasks(1); // at least one, adjust as required
    
    		boolean b = job.waitForCompletion(true);
    		if (!b) {
    			throw new IOException("error with job!");
    		}
    
    	}
    
    }
    
  • 相关阅读:
    Linux/UNIX线程(1)
    jeecms 链接标签
    JEECMS 系统权限设计
    jeecms 前台拦截器的研究与改造
    jeecms系统_自定义对象流程
    jeecms技术预研
    jeecms获取绝对路径
    JEECMS自定义标签
    jeecms项目相关配置文件
    [jeecms]获取父栏目下的子栏目名称
  • 原文地址:https://www.cnblogs.com/ljy2013/p/4820056.html
Copyright © 2011-2022 走看看