zoukankan      html  css  js  c++  java
  • 《HBase in Action》 第三章节的学习总结 ---- 如何编写和运行基于HBase的MapReduce程序

    HBase之所以与Hadoop是最好的伙伴,我理解就因为两点:1.HADOOP的HDFS,为HBase提供了分布式的存储方式;2.HADOOP的MR为HBase提供的分布式的计算方法。u

    其中第一点,主要是HBase在HDFS的支撑下,实现了HRegion来进行分布式的管理。HBase中除了我们定义的数据表格外,其自身还有两类表格:-ROOT-表和.META.表。在分布式的环境下,客户端定位到要访问的具体某行数据,需要依次通过:唯一的-ROOT-表,具体某个.META.表,具体某个RegionServer,具体某个Region,到具体的行。这部分的应用主要在于如何搭建分布式环境

    接来下,这里就总结一下我对于基于HBase的MR程序的编写和运行的理解

    1.准备工作

        环境:CDH4.5,其中提供了hadoop-2.0.0-cdh4.5.0,hbase-0.94.6-cdh4.5.0。基于ubuntu12.04,搭建伪分布式环境

        将hadoop和hbase解压缩后,除了常规的一系列xml文件定义外,还要让hadoop在执行MR时,能调用到HBase。为此,需要特别做的事情,如下:

        A.定义环境 变量:export HADOOP_CLASSPATH=${HBASE_HOME}/hbase-0.94.6-cdh4.5.0-security.jar:${HBASE_HOME}/conf:${HBASE_HOME}/lib/zookeeper-3.4.5-cdh4.5.0.jar

        B.将HBase里面的hbase-site.xml文件 拷贝到 CDH4Dev/hadoop/etc/hadoop 目录;

        C.将HBase里面的hbase-0.94.6-cdh4.5.0-security.jar 拷贝到 CDH4Dev/hadoop/share/hadoop/common 目录;将HBase里面的 lib/zookeeper-3.4.5-cdh4.5.0.jar 拷贝到/home/hadoop/CDH4Dev/hadoop/share/hadoop/common 目录;

        D.依次启动:start-dfs.sh; start-yarn.sh和start-hbase.sh

    2. 编写基于HBase的MR程序

    /**
     * 基于HBase的Mapreduce处理
     * 参考 http://www.cnblogs.com/liangzh/archive/2012/04/19/2457703.html
     */
    package net.lagujw.hbase;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    /**
     * @author hadoop
     *
     */
    public class HBasePracticeMR {
        /**
         * @param args
         */
        //打印表信息
        private static void showtableinfo(HTable table) {
            Scan s = new Scan();
            try {
                ResultScanner sResult = table.getScanner(s);
                for(Result rr : sResult) {
                    for(KeyValue kvrr:rr.raw()) {
                        System.out.println("rowkey is:" + Bytes.toString(kvrr.getRow()));
                        System.out.println("family is:" + Bytes.toString(kvrr.getFamily()));
                        System.out.println("qualify is:" + Bytes.toString(kvrr.getQualifier()));
                        System.out.println("timestamp is: " + kvrr.getTimestamp());
                        System.out.println("value is:" + Bytes.toString(kvrr.getValue()));
                    }
                }
            }catch (IOException e) {
                System.out.println("in showtableinfo, error is " + e.toString());
            }
        }
        
        
        //创建表
        public static void create_intable(Configuration conf) {
            try {
                HBaseAdmin admin = new HBaseAdmin(conf);
                
                /*创建一个表:blog*/
                if(admin.tableExists("blog")) {
                    admin.disableTable("blog");
                    admin.deleteTable("blog");
                }
                
                HTableDescriptor tdesc = new HTableDescriptor("blog");
                HColumnDescriptor cdesc1 = new HColumnDescriptor("article");    
                cdesc1.setMaxVersions(3);
                tdesc.addFamily(cdesc1);
                
                HColumnDescriptor cdesc2 = new HColumnDescriptor("author");    
                cdesc2.setMaxVersions(3);
                tdesc.addFamily(cdesc2);
                
                admin.createTable(tdesc);
                
                //写批量数据
                System.out.println("To add a lots !! ");
                HTable blogTable = new HTable(conf, "blog");
                blogTable.setAutoFlush(false);
                List<Put> puts = new ArrayList<Put>();
                Put p1 = new Put(Bytes.toBytes("1"));
                p1.add(Bytes.toBytes("article"), Bytes.toBytes("content"), Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data."));
                p1.add(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop,HBase,NoSQL"));
                p1.add(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First HBase"));
                
                p1.add(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("hujinjun"));
                //p1.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("yedu"));
                
                //todo 若同一个列存放多个cell,如何自动带上timestamp?如下的参数1或者2用于设置时间戳版本号,但是这样比较山寨
                p1.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), 1, Bytes.toBytes("yedu"));
                p1.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), 2, Bytes.toBytes("一叶渡江")); //带时间参数
                
                Put p2 = new Put(Bytes.toBytes("10"));
                p2.add(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
                p2.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("heyun"));

                Put p3 = new Put(Bytes.toBytes("100"));
                p3.add(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
                p3.add(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("shenxiu"));

                puts.add(p1);
                puts.add(p2);
                puts.add(p3);
                
                blogTable.put(puts);
                blogTable.flushCommits();
                
                showtableinfo(blogTable);
                        
            }catch (IOException e) {
                System.out.println(e.toString());
            }
        }
        
        public static void create_outtable(Configuration conf) {
            try {
                HBaseAdmin admin = new HBaseAdmin(conf);
                
                /*创建一个空表:tag_friend;由于没有定义rowkey,则表在HDFS中实际是没有建立的*/
                if(admin.tableExists("tag_friend")) {
                    admin.disableTable("tag_friend");
                    admin.deleteTable("tag_friend");
                }
                
                HTableDescriptor tdesc = new HTableDescriptor("tag_friend");
                HColumnDescriptor cdesc = new HColumnDescriptor("person");    
                cdesc.setMaxVersions(3);
                tdesc.addFamily(cdesc);
                
                admin.createTable(tdesc);
                
                showtableinfo(new HTable(conf, "tag_friend"));
                
            }catch (IOException e) {
                System.out.println(e.toString());
            }
        }
        
        //自定义map任务
        //这里只需要定义输出的key/value类型即可;默认的<输入key,value>类型是<ImmutableBytesWritable,Result>已经被TableMapper隐藏了--这和hadoop自己的MR不同
        public static class Mymap extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
            //map方法的入参,包括输入的<key1,value1>和输出的上下文对象:Context
            public void map(ImmutableBytesWritable row, Result values,
                    Context context) throws IOException,InterruptedException {
                ImmutableBytesWritable value = null;
                String[] tags = null;
                
                //这里的values,就是Scan结果中的一行Result,也就是和map方法的入参:ImmutableBytesWritable row相对应的那一行的Scan结果
                //这里的KeyValue kv就是此Result中的单一版本的结果值。Result里面的结果值是可以分版本的
                for(KeyValue kv : values.raw()){
                    if("author".equals(Bytes.toString(kv.getFamily()))
                            &&"nickname".equals(Bytes.toString(kv.getQualifier()))){
                        value = new ImmutableBytesWritable(kv.getValue());
                    }
                    if("article".equals(Bytes.toString(kv.getFamily()))
                            &&"tags".equals(Bytes.toString(kv.getQualifier()))){
                        tags = Bytes.toString(kv.getValue()).toLowerCase().split(",");
                    }
                }
                
                for(int i=0;i<tags.length;i++){
                    ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes(tags[i]));
                    context.write(key, value); //设置map任务的输出
                }
            }
        }
        
        //自定义reduce任务
        //这里只有KEYIN, VALUEIN, KEYOUT,没有定义valueOut,因为reduce的ValueOut一般都是Put或Delete(因为只有Put和Delete可以写到一个HTable中),所以ValueOutu也已经被TableMapper隐藏了--这和hadoop自己的MR不同
        public static class Myreduce extends TableReducer<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable> {
            //reduce方法的入参,包括输入的<key2,value2链>和输出的上下文对象:Context  
            public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable>values,
                        Context context) throws IOException,InterruptedException {
                String friends = "";
                for(ImmutableBytesWritable val : values){
                    friends += (friends.length()>0?",":"") + Bytes.toString(val.get());
                }
                
                //定义一个用于写入HTable的Put对象
                Put put = new Put(key.get()); //key的值就作为Rowkey了
                put.add(Bytes.toBytes("person"),Bytes.toBytes("nicknames"), Bytes.toBytes(friends));
                   
                context.write(key, put); //Reduce的任务只要执行到:提供<Rowkey,Put>或者<RowKey,Delete>就可以了;Reduce机制本身会负责将此Put或Delete,按照Rowkey,写到对应的Region中
            }
        }
        
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            try {
                Configuration conf = HBaseConfiguration.create();
                
                //先把输入的表,创建出来,并填写内容
                create_intable(conf);
                
                //再把输出的表,创建好框架,但是不填写内容
                create_outtable(conf);
                
                Job myjob = new Job(conf, "HBaseMR");
                myjob.setJarByClass(HBasePracticeMR.class);
                
                //定义scan,用于控制输入给map的内容。Scan在MR中用处很大
                Scan s = new Scan();
                s.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
                s.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));
                s.setMaxVersions(); //增加扫描条件,获得所有版本
                
                //初始化map任务
                //“blog”:table名,用于指定从哪个table读取数据
                //s:Scan实例,用于从指定的table中扫描得到map任务的输入数据
                //Mymap.class:自定义的map类,这里就负责实际的map任务实现
                //第一个ImmutableBytesWritable.class:指定outputkey的类型
                //第二个ImmutableBytesWritable.class:指定outputvalue的类型
                //myjob:代表此次MR的任务
                TableMapReduceUtil.initTableMapperJob("blog", s, Mymap.class,
                        ImmutableBytesWritable.class, ImmutableBytesWritable.class, myjob);
                
                //初始化Reduce任务
                //“tag_friend”:table名,用于指定要往哪个table写入数据
                //Myreduce.class:自定义的reduce类,这里就负责实际的reduce任务实现
                //myjob:代表此次MR的任务
                TableMapReduceUtil.initTableReducerJob("tag_friend", Myreduce.class, myjob);
                
                System.exit(myjob.waitForCompletion(true)?0:1);
            }catch (IOException e) {
                System.out.println(e.toString());
            }catch (ClassNotFoundException e) {
                System.out.println(e.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("func over!");            
            }
        }
    }

    在Eclipse indigo 版本中,将此文件export成jar包后(选择export-JAR File即可):HBasePracticeMR.jar,然后运行:hadoop jar HBasePracticeMR.jar即可。

  • 相关阅读:
    centos 编码问题 编码转换 cd到对应目录 执行 中文解压
    centos 编码问题 编码转换 cd到对应目录 执行 中文解压
    centos 编码问题 编码转换 cd到对应目录 执行 中文解压
    Android MVP 十分钟入门!
    Android MVP 十分钟入门!
    Android MVP 十分钟入门!
    Android MVP 十分钟入门!
    mysql备份及恢复
    mysql备份及恢复
    mysql备份及恢复
  • 原文地址:https://www.cnblogs.com/lagujw/p/3634355.html
Copyright © 2011-2022 走看看