zoukankan      html  css  js  c++  java
  • 《HBase in Action》 第三章节的学习总结 ---- 如何编写和运行基于HBase的MapReduce程序

    HBase之所以与Hadoop是最好的伙伴,我理解就因为两点:1.HADOOP的HDFS,为HBase提供了分布式的存储方式;2.HADOOP的MR为HBase提供的分布式的计算方法。u

    其中第一点,主要是HBase在HDFS的支撑下,实现了HRegion来进行分布式的管理。HBase中除了我们定义的数据表格外,其自身还有两类表格:-ROOT-表和.META.表。在分布式的环境下,客户端定位到要访问的具体某行数据,需要依次通过:唯一的-ROOT-表,具体某个.META.表,具体某个RegionServer,具体某个Region,到具体的行。这部分的应用主要在于如何搭建分布式环境

    接来下,这里就总结一下我对于基于HBase的MR程序的编写和运行的理解

    1.准备工作

        环境:CDH4.5,其中提供了hadoop-2.0.0-cdh4.5.0,hbase-0.94.6-cdh4.5.0。基于ubuntu12.04,搭建伪分布式环境

        将hadoop和hbase解压缩后,除了常规的一系列xml文件定义外,还要让hadoop在执行MR时,能调用到HBase。为此,需要特别做的事情,如下:

        A.定义环境 变量:export HADOOP_CLASSPATH=${HBASE_HOME}/hbase-0.94.6-cdh4.5.0-security.jar:${HBASE_HOME}/conf:${HBASE_HOME}/lib/zookeeper-3.4.5-cdh4.5.0.jar

        B.将HBase里面的hbase-site.xml文件 拷贝到 CDH4Dev/hadoop/etc/hadoop 目录;

        C.将HBase里面的hbase-0.94.6-cdh4.5.0-security.jar 拷贝到 CDH4Dev/hadoop/share/hadoop/common 目录;将HBase里面的 lib/zookeeper-3.4.5-cdh4.5.0.jar 拷贝到/home/hadoop/CDH4Dev/hadoop/share/hadoop/common 目录;

        D.依次启动:start-dfs.sh; start-yarn.sh和start-hbase.sh

    2. 编写基于HBase的MR程序

    /**
     * 基于HBase的Mapreduce处理
     * 参考 http://www.cnblogs.com/liangzh/archive/2012/04/19/2457703.html
     */
    package net.lagujw.hbase;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    /**
     * @author hadoop
     *
     */
    public class HBasePracticeMR {
        /**
         * @param args
         */
        //打印表信息
        private static void showtableinfo(HTable table) {
            Scan s = new Scan();
            try {
                ResultScanner sResult = table.getScanner(s);
                for(Result rr : sResult) {
                    for(KeyValue kvrr:rr.raw()) {
                        System.out.println("rowkey is:" + Bytes.toString(kvrr.getRow()));
                        System.out.println("family is:" + Bytes.toString(kvrr.getFamily()));
                        System.out.println("qualify is:" + Bytes.toString(kvrr.getQualifier()));
                        System.out.println("timestamp is: " + kvrr.getTimestamp());
                        System.out.println("value is:" + Bytes.toString(kvrr.getValue()));
                    }
                }
            }catch (IOException e) {
                System.out.println("in showtableinfo, error is " + e.toString());
            }
        }
        
        
        //创建表
        public static void create_intable(Configuration conf) {
            try {
                HBaseAdmin admin = new HBaseAdmin(conf);
                
                /*创建一个表:blog*/
                if(admin.tableExists("blog")) {
                    admin.disableTable("blog");
                    admin.deleteTable("blog");
                }
                
                HTableDescriptor tdesc = new HTableDescriptor("blog");
                HColumnDescriptor cdesc1 = new HColumnDescriptor("article");    
                cdesc1.setMaxVersions(3);
                tdesc.addFamily(cdesc1);
                
                HColumnDescriptor cdesc2 = new HColumnDescriptor("author");    
                cdesc2.setMaxVersions(3);
                tdesc.addFamily(cdesc2);
                
                admin.createTable(tdesc);
                
                //写批量数据
                System.out.println("To add a lots !! ");
                HTable blogTable = new HTable(conf, "blog");
                blogTable.setAutoFlush(false);
                List<Put> puts = new ArrayList<Put>();
                Put p1 = new Put(Bytes.toBytes("1"));
                p1.add(Bytes.toBytes("article"), Bytes.toBytes("content"), Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data."));
                p1.add(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop,HBase,NoSQL"));
                p1.add(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First HBase"));
                
                p1.add(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("hujinjun"));
                //p1.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("yedu"));
                
                //todo 若同一个列存放多个cell,如何自动带上timestamp?如下的参数1或者2用于设置时间戳版本号,但是这样比较山寨
                p1.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), 1, Bytes.toBytes("yedu"));
                p1.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), 2, Bytes.toBytes("一叶渡江")); //带时间参数
                
                Put p2 = new Put(Bytes.toBytes("10"));
                p2.add(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
                p2.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("heyun"));

                Put p3 = new Put(Bytes.toBytes("100"));
                p3.add(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
                p3.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("shenxiu"));

                puts.add(p1);
                puts.add(p2);
                puts.add(p3);
                
                blogTable.put(puts);
                blogTable.flushCommits();
                
                showtableinfo(blogTable);
                        
            }catch (IOException e) {
                System.out.println(e.toString());
            }
        }
        
        public static void create_outtable(Configuration conf) {
            try {
                HBaseAdmin admin = new HBaseAdmin(conf);
                
                /*创建一个空表:tag_friend;由于没有定义rowkey,则表在HDFS中实际是没有建立的*/
                if(admin.tableExists("tag_friend")) {
                    admin.disableTable("tag_friend");
                    admin.deleteTable("tag_friend");
                }
                
                HTableDescriptor tdesc = new HTableDescriptor("tag_friend");
                HColumnDescriptor cdesc = new HColumnDescriptor("person");    
                cdesc.setMaxVersions(3);
                tdesc.addFamily(cdesc);
                
                admin.createTable(tdesc);
                
                showtableinfo(new HTable(conf, "tag_friend"));
                
            }catch (IOException e) {
                System.out.println(e.toString());
            }
        }
        
        //自定义map任务
        //这里只需要定义输出的key/value类型即可;默认的<输入key,value>类型是<ImmutableBytesWritable,Result>已经被TableMapper隐藏了--这和hadoop自己的MR不同
        public static class Mymap extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
            //map方法的入参,包括输入的<key1,value1>和输出的上下文对象:Context
            public void map(ImmutableBytesWritable row, Result values,
                    Context context) throws IOException,InterruptedException {
                ImmutableBytesWritable value = null;
                String[] tags = null;
                
                //这里的values,就是Scan结果中的一行Result,也就是和map方法的入参:ImmutableBytesWritable row相对应的那一行的Scan结果
                //这里的KeyValue kv就是此Result中的单一版本的结果值。Result里面的结果值是可以分版本的
                for(KeyValue kv : values.raw()){
                    if("author".equals(Bytes.toString(kv.getFamily()))
                            &&"nickname".equals(Bytes.toString(kv.getQualifier()))){
                        value = new ImmutableBytesWritable(kv.getValue());
                    }
                    if("article".equals(Bytes.toString(kv.getFamily()))
                            &&"tags".equals(Bytes.toString(kv.getQualifier()))){
                        tags = Bytes.toString(kv.getValue()).toLowerCase().split(",");
                    }
                }
                
                for(int i=0;i<tags.length;i++){
                    ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes(tags[i]));
                    context.write(key, value); //设置map任务的输出
                }
            }
        }
        
        //自定义reduce任务
        //这里只有KEYIN, VALUEIN, KEYOUT,没有定义valueOut,因为reduce的ValueOut一般都是Put或Delete(因为只有Put和Delete可以写到一个HTable中),所以ValueOutu也已经被TableMapper隐藏了--这和hadoop自己的MR不同
        public static class Myreduce extends TableReducer<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable> {
            //reduce方法的入参,包括输入的<key2,value2链>和输出的上下文对象:Context  
            public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable>values,
                        Context context) throws IOException,InterruptedException {
                String friends = "";
                for(ImmutableBytesWritable val : values){
                    friends += (friends.length()>0?",":"") + Bytes.toString(val.get());
                }
                
                //定义一个用于写入HTable的Put对象
                Put put = new Put(key.get()); //key的值就作为Rowkey了
                put.add(Bytes.toBytes("person"),Bytes.toBytes("nicknames"), Bytes.toBytes(friends));
                   
                context.write(key, put); //Reduce的任务只要执行到:提供<Rowkey,Put>或者<RowKey,Delete>就可以了;Reduce机制本身会负责将此Put或Delete,按照Rowkey,写到对应的Region中
            }
        }
        
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            try {
                Configuration conf = HBaseConfiguration.create();
                
                //先把输入的表,创建出来,并填写内容
                create_intable(conf);
                
                //再把输出的表,创建好框架,但是不填写内容
                create_outtable(conf);
                
                Job myjob = new Job(conf, "HBaseMR");
                myjob.setJarByClass(HBasePracticeMR.class);
                
                //定义scan,用于控制输入给map的内容。Scan在MR中用处很大
                Scan s = new Scan();
                s.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
                s.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));
                s.setMaxVersions(); //增加扫描条件,获得所有版本
                
                //初始化map任务
                //“blog”:table名,用于指定从哪个table读取数据
                //s:Scan实例,用于从指定的table中扫描得到map任务的输入数据
                //Mymap.class:自定义的map类,这里就负责实际的map任务实现
                //第一个ImmutableBytesWritable.class:指定outputkey的类型
                //第二个ImmutableBytesWritable.class:指定outputvalue的类型
                //myjob:代表此次MR的任务
                TableMapReduceUtil.initTableMapperJob("blog", s, Mymap.class,
                        ImmutableBytesWritable.class, ImmutableBytesWritable.class, myjob);
                
                //初始化Reduce任务
                //“tag_friend”:table名,用于指定要往哪个table写入数据
                //Myreduce.class:自定义的reduce类,这里就负责实际的reduce任务实现
                //myjob:代表此次MR的任务
                TableMapReduceUtil.initTableReducerJob("tag_friend", Myreduce.class, myjob);
                
                System.exit(myjob.waitForCompletion(true)?0:1);
            }catch (IOException e) {
                System.out.println(e.toString());
            }catch (ClassNotFoundException e) {
                System.out.println(e.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("func over!");            
            }
        }
    }

    在Eclipse indigo 版本中,将此文件export成jar包后(选择export-JAR File即可):HBasePracticeMR.jar,然后运行:hadoop jar HBasePracticeMR.jar即可。

  • 相关阅读:
    FuelPHP 系列(三) ------ Model 模型
    FuelPHP 系列(二) ------ route 路由
    FuelPHP 系列(一) ------ Oil 命令
    微信小程序初探
    基于 Laravel 的 文件管理
    laravel(一)
    Git从零开始(三)
    Git从零开始(二)
    Git从零开始(一)
    Linux服务器学习(二)
  • 原文地址:https://www.cnblogs.com/lagujw/p/3634355.html
Copyright © 2011-2022 走看看