zoukankan      html  css  js  c++  java
  • HBase with MapReduce (MultiTable Read)

    hbase当中没有两表联查的操作,要实现两表联查或者在查询一个表的同时也需要访问另外一张表的时候,可以通过mapreduce的方式来实现,实现方式如下:由于查询是map过程,因此这个过程不需要设计reduce过程。

    (1)map的实现

    package com.datacenter.HbaseMapReduce.MultiReadTable;
    
    import java.io.IOException;
    import java.util.NavigableMap;
    import java.util.Map.Entry;
    
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import com.datacenter.HbaseMapReduce.Read.ReadHbase;
    
    public class MuliTableReadmapper extends TableMapper<Text, LongWritable> {
    
    	private ResultScanner rs=null; 
    	
    	@Override
    	protected void map(ImmutableBytesWritable key, Result value, Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		printResult(value);
    		
    		//输出第二张表的内容
    		Result temp=rs.next();//这个结果只是一个单元的结果,所谓一个单元可以理解成是一行的数据
    		while(temp!=null){
    			printResult(temp);
    			temp=rs.next();
    		}
    		
    	}
    
    	@Override
    	protected void setup(Context context) throws IOException,
    			InterruptedException {
    		// TODO Auto-generated method stub
    		HConnection hconn = MultiReadTableMain.HbaseUtil(
    				MultiReadTableMain.rootdir, MultiReadTableMain.zkServer,
    				MultiReadTableMain.port);
    
    		HTableInterface ht = hconn.getTable("test");
    
    		Scan scan = new Scan();
    		scan.setCaching(500); // 1 is the default in Scan, which will be bad for
    								// MapReduce jobs
    		scan.setCacheBlocks(false); // don't set to true for MR jobs
    
    		rs = ht.getScanner(scan);
    		
    	}
    
    	// 按顺序输出
    	public void printResult(Result rs) {
    
    		if (rs.isEmpty()) {
    			System.out.println("result is empty!");
    			return;
    		}
    
    		NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs
    				.getMap();
    		String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
    		System.out.println("rowkey->" + rowkey);
    		for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps
    				.entrySet()) {
    			System.out.print("	family->" + Bytes.toString(temp.getKey()));
    			for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp
    					.getValue().entrySet()) {
    				System.out.print("	col->" + Bytes.toString(value.getKey()));
    				for (Entry<Long, byte[]> va : value.getValue().entrySet()) {
    					System.out.print("	vesion->" + va.getKey());
    					System.out.print("	value->"
    							+ Bytes.toString(va.getValue()));
    					System.out.println();
    				}
    			}
    		}
    	}
    
    }
    

     (2)主类的实现

    package com.datacenter.HbaseMapReduce.MultiReadTable;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
    
    import com.datacenter.HbaseMapReduce.Read.ReadHbase;
    import com.datacenter.HbaseMapReduce.Read.ReadHbaseMapper;
    
    public class MultiReadTableMain {
    	static public String rootdir = "hdfs://hadoop3:8020/hbase";
    	static public String zkServer = "hadoop3";
    	static public String port = "2181";
    
    	private static Configuration conf;
    	private static HConnection hConn = null;
    
    	public static HConnection HbaseUtil(String rootDir, String zkServer, String port) {
    
    		conf = HBaseConfiguration.create();// 获取默认配置信息
    		conf.set("hbase.rootdir", rootDir);
    		conf.set("hbase.zookeeper.quorum", zkServer);
    		conf.set("hbase.zookeeper.property.clientPort", port);
    
    		try {
    			hConn = HConnectionManager.createConnection(conf);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		return hConn;
    	}
    
    	public static void main(String[] args) throws Exception {
    		// TODO Auto-generated method stub
    		HbaseUtil(rootdir, zkServer, port);
    
    		// Configuration config = HBaseConfiguration.create();
    
    		Job job = new Job(conf, "ExampleRead");
    		job.setJarByClass(ReadHbase.class); // class that contains mapper
    
    		Scan scan = new Scan();
    		scan.setCaching(500); // 1 is the default in Scan, which will be bad for
    								// MapReduce jobs
    		scan.setCacheBlocks(false); // don't set to true for MR jobs
    		// set other scan attrs
    
    		TableMapReduceUtil.initTableMapperJob("score", // input HBase table name
    				scan, // Scan instance to control CF and attribute selection
    				MuliTableReadmapper.class, // mapper
    				null, // mapper output key
    				null, // mapper output value
    				job);
    		job.setOutputFormatClass(NullOutputFormat.class); // because we aren't
    															// emitting anything
    															// from mapper
    
    		boolean b = job.waitForCompletion(true);
    		if (!b) {
    			throw new IOException("error with job!");
    		}
    	}
    
    }
    
  • 相关阅读:
    【贪心】【堆】Gym
    【并查集】Gym
    【拓扑排序】【bitset】Gym
    【递归】【线段树】【堆】AtCoder Regular Contest 080 E
    【二分图】【并查集】XVII Open Cup named after E.V. Pankratiev Stage 14, Grand Prix of Tatarstan, Sunday, April 2, 2017 Problem L. Canonical duel
    【动态规划】【滚动数组】【bitset】XVII Open Cup named after E.V. Pankratiev Stage 14, Grand Prix of Tatarstan, Sunday, April 2, 2017 Problem J. Terminal
    【二分】【字符串哈希】【二分图最大匹配】【最大流】XVII Open Cup named after E.V. Pankratiev Stage 14, Grand Prix of Tatarstan, Sunday, April 2, 2017 Problem I. Minimum Prefix
    【枚举】【最小表示法】XVII Open Cup named after E.V. Pankratiev Stage 14, Grand Prix of Tatarstan, Sunday, April 2, 2017 Problem F. Matrix Game
    【推导】【构造】XVII Open Cup named after E.V. Pankratiev Stage 14, Grand Prix of Tatarstan, Sunday, April 2, 2017 Problem E. Space Tourists
    【推导】【贪心】XVII Open Cup named after E.V. Pankratiev Stage 14, Grand Prix of Tatarstan, Sunday, April 2, 2017 Problem D. Clones and Treasures
  • 原文地址:https://www.cnblogs.com/ljy2013/p/4820076.html
Copyright © 2011-2022 走看看