最近在学习HBase,在看到了如何使用Mapreduce来操作Hbase,下面将几种情况介绍一下,具体的都可以参照官网上的文档说明。官网文档连接:http://hbase.apache.org/book.html 。通过学习我个人的对MapReduce操作HBase的方式可以看作的是Map过程是负责读取过程,Reduce负责的是写入的过程,一读一写可以完成对HBase的读写过程。
利用MapReduce 读取(Read)HBase中的表数据,这一过程由于只涉及到读过程,因此仅仅只需要实现Map函数即可。
(1)ReadHbaseMapper类的实现是需要继承TableMapper的,具体的实现如下:
package com.datacenter.HbaseMapReduce.Read; import java.io.IOException; import java.util.Map.Entry; import java.util.NavigableMap; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; public class ReadHbaseMapper extends TableMapper<Text, Text> { public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { // process data for the row from the Result instance. printResult(value); } // 按顺序输出 public void printResult(Result rs) { if (rs.isEmpty()) { System.out.println("result is empty!"); return; } NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs .getMap(); String rowkey = Bytes.toString(rs.getRow()); // actain rowkey System.out.println("rowkey->" + rowkey); for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps .entrySet()) { System.out.print(" family->" + Bytes.toString(temp.getKey())); for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp .getValue().entrySet()) { System.out.print(" col->" + Bytes.toString(value.getKey())); for (Entry<Long, byte[]> va : value.getValue().entrySet()) { System.out.print(" vesion->" + va.getKey()); System.out.print(" value->" + Bytes.toString(va.getValue())); System.out.println(); } } } } }
(2)添加main函数类,来加载配置信息,是实现如下:
package com.datacenter.HbaseMapReduce.Read; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; //通过map从hbase中读取数据 public class ReadHbase { static public String rootdir = "hdfs://hadoop3:8020/hbase"; static public String zkServer = "hadoop3"; static public String port = "2181"; private static Configuration conf; private static HConnection hConn = null; public static void HbaseUtil(String rootDir, String zkServer, String port) { conf = HBaseConfiguration.create();// 获取默认配置信息 conf.set("hbase.rootdir", rootDir); conf.set("hbase.zookeeper.quorum", zkServer); conf.set("hbase.zookeeper.property.clientPort", port); try { hConn = HConnectionManager.createConnection(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws Exception { HbaseUtil( rootdir, zkServer, port); //Configuration config = HBaseConfiguration.create(); Job job = new Job(conf, "ExampleRead"); job.setJarByClass(ReadHbase.class); // class that contains mapper Scan scan = new Scan(); //此处可以添加过滤器来设置过滤等 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob( "score", // input HBase table name scan, // Scan instance to control CF and attribute selection ReadHbaseMapper.class, // mapper null, // mapper output key null, // mapper output value job); job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }
此时已经完成了对一个表进行遍历的操作的过程,也就是输出整张表的内容的操作。