zoukankan      html  css  js  c++  java
  • HBase与MapReduce集成

    即HBase作为MapReduce的数据来源,MapReduce 分析,输出数据存储在HBase表中

    CLASSPATH

    HBase, MapReduce, and the CLASSPATH
    By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.
    官网bb了很多,意思是说,mapReduce 默认是没有添加HBase的依赖包的,你可以通过添加HBase-site这个配置文件到hadoop配置目录下,但是这样要复制到整个集群;或者你可以编辑Hadoop的CLASSPATH,但这样又会使得你的Hadoop环境受到污染。而且需要重启Hadoop集群才能生效。
    因此,最好的方法是让HBase自己添加自己的依赖包到Hadoop的CLASSPATH,然后再使用程序。

    1.输出MapReduce与HBase集成时候需要的HBase依赖包

    bin/hbase mapredcp
    

    2.于是我们可以,通过以下方法执行程序

    #先将HBase的依赖包告诉世界 (空格) 然后执行mapreduce程序
    $ HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp` $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.12.0.jar
    

    工具包hbase-server-VERSION.jar含了以下几个功能(超级有用)

    # 统计Cell数目
    CellCounter: Count cells in HBase table.
    
    # 
    WALPlayer: Replay WAL files.
    
    # ******大量的数据加载******重中之重,把TSV、CSV格式的文件通过 MapReduce 直接存储成 hfile(以块存储的HBase文件) 然后加载(移动)到表中去,不走正常的路径一条条插入
    completebulkload: Complete a bulk data load.
    
    # 从一个集群拷贝到另一个集群
    copytable: Export a table from local cluster to peer cluster.
    
    # 导入导出数据从HBase >	HDFS 
    export: Write table data to HDFS.
    exportsnapshot: Export the specific snapshot to a given FileSystem.
    import: Import data written by Export.
    # TSV table分隔 CSV 使用逗号分隔
    importtsv: Import data in TSV format.
    
    # 统计行数
    rowcounter: Count rows in HBase table.
    verifyrep: Compare the data from tables in two different clusters.
    

    MapReduce读写HBase范例程序编写(参考官网)

    package com.gci.hadoop.hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * 需求分析,从数据表user读取info:name到新表basic:info:name
     */
    
    // extends Configured implements Tool 实现Tool接口的run方法,真正的入口的方法
    public class Table_user2basic extends Configured implements Tool {
    
    	public static final String sourceTable = "user";
    	public static final String targetTable = "basic";
    
    	// 一.Mapper class extends TableMapper<KEYOUT输出的Key的类型, VALUEOUT输出的Value的类型>
    	// 原版的Mapper程序是有输入的KV类型,和输出的KV类型四个参数,源码:extends Mapper<ImmutableBytesWritable,
    	// Result, KEYOUT, VALUEOUT>
    	// Put类型为hbase中定义的类型,便于作为Reducer的输入类型,根据reducer输入类型可知
    	public static class ReadUserMapper extends TableMapper<Text, Put> {
    
    		private Text mapOutputKey = new Text();
    
    		@Override
    		public void map(ImmutableBytesWritable key, Result value,
    				Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
    				throws IOException, InterruptedException {
    			// get rowKey
    			String rowKey = Bytes.toString(key.get());
    
    			// set outputRowKey
    			mapOutputKey.set(rowKey);
    
    			// 通过rowKey创建put对象
    			Put put = new Put(key.get());
    
    			// 迭代以获取cell数据
    			for (Cell cell : value.rawCells()) {
    				// add family 详情请看HBase API 使用(让info在前,避免了空指针异常)
    				if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
    					// add column:name
    					if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
    						put.add(cell);
    					}
    				}
    			}
    		}
    	}
    
    	// 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT>
    	// 输出key 类型为ImmutableBytesWritable 实现writeableComparable的字节数组
    	// 输出 value 类型为 Mutation 是 delete put increment append 的父类
    	public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
    
    		@Override
    		public void reduce(Text key, Iterable<Put> values,
    				Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
    				throws IOException, InterruptedException {
    			// 从得到的put中得到数据
    			for (Put put : values) {
    				// 往外写数据
    				context.write(null, put);
    			}
    		}
    
    	}
    
    	// 三.Driver
    	public int run(String[] arg0) throws Exception {
    
    		// create job
    		Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
    
    		// set run job class
    		job.setJarByClass(this.getClass());
    
    		// set job
    		Scan scan = new Scan();
    		scan.setCaching(500); // 每次获取条目数 1 is the default in Scan, which will be bad for MapReduce jobs
    		scan.setCacheBlocks(false); // don't set to true for MR jobs
    		// set other scan attrs
    
    		// set input and set mapper
    		TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
    				scan, // Scan instance to control CF and attribute selection
    				ReadUserMapper.class, // mapper class
    				Text.class, // mapper output key
    				Put.class, // mapper output value
    				job);
    
    		// set reducer and output
    		TableMapReduceUtil.initTableReducerJob(targetTable, // output table
    				WriteBasicReducer.class, // reducer class
    				job);
    		job.setNumReduceTasks(1); // 设置Reduce个数 at least one, adjust as required
    
    		// 提交 submit job
    		Boolean isSuccess = job.waitForCompletion(true);
    		return isSuccess ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		// get configuration
    		Configuration configuration = HBaseConfiguration.create();
    
    		// submit job 提交job
    		int status = ToolRunner.run(configuration, new Table_user2basic(), args);
    
    		// exit program 结束程序
    		System.exit(status);
    
    	}
    }
  • 相关阅读:
    JQuery第一部分
    linq高级查与分页
    怎么修改数据库信息例子
    怎样连接客户端和操作数据库
    php数组的合并与方法
    php函数与数组
    php变量类型,运算符与表达式
    php正则表达式
    数组和字符串的几种常见替代方法
    php变量,类型和表达式
  • 原文地址:https://www.cnblogs.com/cenzhongman/p/7367696.html
Copyright © 2011-2022 走看看