zoukankan      html  css  js  c++  java
  • [How to] MapReduce on HBase ----- 简单二级索引的实现

    1.简介

      MapReduce计算框架是二代hadoop的YARN一部分,能够提供大数据量的平行批处理。MR只提供了基本的计算方法,之所以能够使用在不用的数据格式上包括HBase表上是因为特定格式上的数据读取和写入都实现了各自的inputformat和outputformat,这样MR就通过这两个接口屏蔽了各个数据源的产异性,统一计算框架。本文主要介绍如何让HBase表作为MR计算框架的输入和输出源,并通过实现一个简历二级索引的小例子来介绍。

    2. HBase与MR关系

      HBase和MapReduce,这两者并没有直接关系,隶属于不同的项目。这里讲到的MapReduce on HBase是指利用HBase表做为MR计算框架的数据输入源或者输出源源,使得能够利用MR的并行计算能力计算HBase的内部数据。

    3. 运行环境

      之前所述,HBase和MapReduce没有直接关系,所以在编程的时候我们需要分别引入MR和HBase包,在运行的时候也要做相关的设置让HBase的包被MR感知到。

    在运行HBase相关的MR任务的时候我们可以将HBase相关包和配置文件拷贝到Hadoop运行目录中,如hbase-site.xml 拷贝到$HADOOP_HOME/conf再将HBase jars 拷贝到 $HADOOP_HOME/lib,但是并不推荐这样的做法,因为一会污染hadoop的安装环境,二还需要重启hadoop才能起效。

      所以我们可以按如下的推荐做法来运行MR程序

    $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar <your jar> <param .......>

       例如在我环境下我利用如下命令提交HBase自带的样例程序:

    [hadoop@xufeng-3 lib]$ HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar hbase-server-1.0.0-cdh5.4.2.jar rowcounter usertable

      以上会将/opt/hadoop/hbase/bin/hbase classpath 下的所有文件拷贝的hdfs上以供后续程序运行时候引用,缺点就是可能只能在安装有HBase环境的机器上执行。

    4.简单二级索引的实现

      下面以一个简单的二级索引实现为例子讲解HBase MR程序的编写。

      需要注意的是现在HBase包存在两套MR引用包,分别是org.apache.hadoop.hbase.mapredorg.apache.hadoop.hbase.mapreduce。称之为旧API和新API。通常社区推荐的是新API,旧API后续版本有被淘汰的计划。

      所谓基于框架的代码实现之前的博客也有介绍,简单来说就是:

      1. 书写固定的框架代码

      2. 在框架中填充自身业务的代码逻辑

      固定框架代码

        一般的MR程序中Mapper方法是必须的,Reducer方法就根据业务需要吧。

        下属代码中TableMapReduceUtil为了我们提供了极大的便利,她在内部为我们指定了TableInputFormat和TableOutputFormat等一些列的工作。

    /**
     * 利用MR程序简历HBase二级索引
     * @author newbeefeng
     *
     */
    public class YourAction {
    
        /**
         * 运行方法
         * @param args
         * @throws IOException
         * @throws ClassNotFoundException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 创建配置
            Configuration conf = HBaseConfiguration.create();
            
            // 创建job
            Job job = Job.getInstance(conf);
            
            // 设定job名称
            job.setJobName("名称");
            
            // 设定任务类(当前类)
            job.setJarByClass(YourAction.class);
            
            // 扫描
            Scan scan = new Scan();
            // 设定caching
            scan.setCaching(1000);
            // 对于mr程序来说必须设定为false
            scan.setCacheBlocks(false);
            
            // 利用TableMapReduceUtil初始化mapper
            TableMapReduceUtil.initTableMapperJob("数据源表名", scan, YourMapper.class, Text.class, Text.class, job);
               
            // 利用TableMapReduceUtil初始化reducer
            TableMapReduceUtil.initTableReducerJob("数据输出表名", YourReducer.class, job);
            
            // 提交并等待任务运行完毕
            boolean b = job.waitForCompletion(true);
            if (!b) {
              throw new IOException("error with job!");
            }
            
        }
        
    }
    
    /**
     * 实现具体的mapper类,这个类定义是必须的,因为mr任务可以没有reducer但是一定要有mapper
     * 
     * 此类继承TableMapper,此抽象类帮助我们实现了基本默认实现,用户只要关心具体的业务即可
     * 
     * 
     * @author newbeefeng
     *
     */
    class YourMapper extends TableMapper<Text,Text>
    {
    
        // 实现具体map业务逻辑
        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {
        }
    
    }
    
    /**
     * 实现具体的reducer类
     * 
     * 此类继承TableReducer,此抽象类帮助我们实现了基本默认实现,用户只要关心具体的业务即可
     * 
     * 
     * @author newbeefeng
     *
     */
    class YourReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
    {
    
        // 实现具体mreduce业务逻辑
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
       
        }
        
    }

      基于框架代码的简单二级索引的业务实现

    /**
     * 利用MR程序简历HBase二级索引
     * @author newbeefeng
     *
     */
    public class BatchCreateSecondIndex {
    
        /**
         * 运行方法
         * @param args
         * @throws IOException
         * @throws ClassNotFoundException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 创建配置
            Configuration conf = HBaseConfiguration.create();
            
            // 创建job
            Job job = Job.getInstance(conf);
            
            // 设定job名称
            job.setJobName("mapreduce on HBase for create second index!");
            
            // 设定任务类(当前类)
            job.setJarByClass(BatchCreateSecondIndex.class);
            
            // 扫描
            Scan scan = new Scan();
            // 设定caching
            scan.setCaching(1000);
            // 对于mr程序来说必须设定为false
            scan.setCacheBlocks(false);
            
            // 利用TableMapReduceUtil初始化mapper
            TableMapReduceUtil.initTableMapperJob("mr_secondindex_resouce", scan, IndexMapper.class, Text.class, Text.class, job);
               
            // 利用TableMapReduceUtil初始化reducer
            TableMapReduceUtil.initTableReducerJob("mr_secondindex_result", IndexReducer.class, job);
            
            // 提交并等待任务运行完毕
            boolean b = job.waitForCompletion(true);
            if (!b) {
              throw new IOException("error with job!");
            }
            
        }
        
    }
    
    /**
     * 实现具体的mapper类,这个类定义是必须的,因为mr任务可以没有reducer但是一定要有mapper
     * 
     * 此类继承TableMapper,此抽象类帮助我们实现了基本默认实现,用户只要关心具体的业务即可
     * 
     * 
     * @author newbeefeng
     *
     */
    class IndexMapper extends TableMapper<Text,Text>
    {
    
        // 实现具体map业务逻辑
        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {
            Text k = new Text(Bytes.toString(key.get()));
            Text v = new Text(Bytes.toString(value.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))));
            // 这里其实是直接将每行数据给了reduce,不做任何处理,其实这个二级索引完全可以在map阶段完成全部工作
            // 但是为了演示需要,还是写了reduce
            System.out.println("k = " + k);
            System.out.println("v = " + v);
            context.write(k, v);
        }
    
    }
    
    /**
     * 实现具体的reducer类
     * 
     * 此类继承TableReducer,此抽象类帮助我们实现了基本默认实现,用户只要关心具体的业务即可
     * 
     * 
     * @author newbeefeng
     *
     */
    class IndexReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
    {
    
        // 实现具体mreduce业务逻辑
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
            Text value = null;
            
            // 根据map逻辑。values中也只会有一个数据
            for(Text text : values)
            {
                value = text;
            }
            
            // 构造put将数据写入当job中指定的表中
            Put put = new Put(Bytes.toBytes(key.toString() + "|" + value.toString()));
            put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(value.toString()));
            System.out.println(put);
            
            // 执行写入
            context.write(null, put);    
        }
        
    }

    5. 测试

      将上述工程打包后,放入HBase环境机器上执行:

    HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar Test-MapreduceOnHBase.jar cn.com.newbee.feng.mr.BatchCreateSecondIndex

      测试结果:现在索引表中value列已经在rowkey中:

    hbase(main):002:0> scan 'mr_secondindex_resouce'
    ROW                                            COLUMN+CELL                                                                                                                           
     lisi                                          column=f:age, timestamp=1469887264781, value=25                                                                                       
     wangwu                                        column=f:age, timestamp=1469887270347, value=30                                                                                       
     zhangsan                                      column=f:age, timestamp=1469887260046, value=20                                                                                       
     zhaoliu                                       column=f:age, timestamp=1469887275702, value=35                                                                                       
    4 row(s) in 0.3490 seconds
    
    hbase(main):003:0> scan 'mr_secondindex_result'
    ROW                                            COLUMN+CELL                                                                                                                           
     lisi|25                                       column=f:age, timestamp=1469890284944, value=25                                                                                       
     wangwu|30                                     column=f:age, timestamp=1469890284944, value=30                                                                                       
     zhangsan|20                                   column=f:age, timestamp=1469890284944, value=20                                                                                       
     zhaoliu|35                                    column=f:age, timestamp=1469890284944, value=35                                                                                       
    4 row(s) in 0.0280 seconds

      

    6. 总结

      MapReduce on HBase 内部其实还是使用了HBase客户端插入的方式将数据在MAP阶段或者在reduce阶段将数据通过API插入到目标表中。HBase为了配合MR计算框架实现了TableInputFormat和TableOutputFormat。并为开发者提供了便利的API去操作如TableMapReduceUtil以及TableMapper和TableReducer等,用户只要将注意力集中在具体的map和reduce业务上即可。

    7.参考:

        https://hbase.apache.org/book.html#mapreduce

    8.代码:

      下载地址:https://github.com/xufeng79x/MapreduceOnHBaseTest

      

  • 相关阅读:
    nunit2.5.7 单元测试时提示:“当前不会命中断点 还没有为该文档加载任何符号”
    文件下载报错:引发类型为“System.OutOfMemoryException”的异常-.Net 内存溢出
    asp.net 访问页面访问统计实现 for iis7
    easyui tree 更改图标
    asp.net 访问页面访问统计实现
    记一次空格引起的查询问题
    SVN如何忽略dll文件和bin目录
    vmware 中安装Ghost XP 版本心得
    冒泡排序
    JS数组去重
  • 原文地址:https://www.cnblogs.com/ios123/p/6386383.html
Copyright © 2011-2022 走看看