zoukankan      html  css  js  c++  java
  • Hbase 整合 Hadoop 的数据迁移

    上篇文章说了 Hbase 的基础架构,都是比较理论的知识,最近我也一直在搞 Hbase 的数据迁移, 今天就来一篇实战型的,把最近一段时间的 Hbase 整合 Hadoop 的基础知识在梳理一遍,毕竟当初搞得时候还是有点摸不着方向,写下来也方便以后查阅。

    之前使用 Hbase 大多是把它当做实时数据库来做查询使用的,大部分使用的都是 Hbase 的基础 Api, Hbase 与 Hadoop Hive 框架的整合还真是没系统的搞过,话不多说,先看看本文的架构图:

    PS:文中提到的代码见最后 参考资料

    着重点在前两部分,后面的都是大家比较熟悉的部分了。


    1  Hbase 与 Hadoop 集成

    Hbase 与 Hadoop 相关操作主要可以分为如下三种情况:

    • 一张 hbase 表数据导入另一张 hbase 表

    • HDFS 数据导入 Hbase 表

    • HDFS 数据(超大数据)导入 Hbase 表

    以上三种情况的数据迁移基本都是依靠 MR 程序来完成的,所以重点又回到了 MR 编程。

    0hbase表数据导入

    思路:准备 MR 程序将一张 Hbase 表写入到另一张 Hbase 表即可。

    注意:两张 Hbase 表导入数据的列族信息要一致;有数据的 Hbase 在读入数据时要注意非空判断。

    准备工作:


    准备 user1 表 列族 为 f1,f1 中有 age ,name属性 ,作为输入表;

    准备 user2 表,创建列族 f1,作为输出表。

    主要代码:

    Mapper 端:这里注意继承的 是 TableMapper

     1 public class HBaseReadMapper extends TableMapper<Text,Put> {
     2         /**
     3          *
     4          * @param key rowkey
     5          * @param value rowkey 此行的数据  Result 类型
     6          * @param context
     7          * @throws IOException
     8          * @throws InterruptedException
     9          */
    10         @Override
    11         protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    12             //获得rowkey 的字节数组
    13             byte[] rowkey_bytes = key.get();
    14             String rowKeyStr = Bytes.toString(rowkey_bytes);
    15             //准备好 put 对象 用于输出下游
    16             Put put = new Put(rowkey_bytes);
    17             //text 作为输出的 key
    18             Text text = new Text(rowKeyStr);
    19             //输出数据 - 写数据 - 普通 构建put 对象
    20             Cell[] cells = value.rawCells();
    21             //将 f1 : name & age 输出
    22             for (Cell cell : cells) {
    23                 //当前 cell是否是 f1
    24                 //获取列族
    25                 byte[] family = CellUtil.cloneFamily(cell);
    26                 String familyStr = Bytes.toString(family);
    27 28                 if("f1".equals(familyStr)){
    29                     //在判断是否是 name | age
    30                     put.add(cell);
    31                 }
    32 33                 if("f2".equals(familyStr)){
    34                     put.add(cell);
    35                 }
    36             }
    37           //注意非空判断 不然会报错
    38             if(!put.isEmpty()){
    39                 context.write(text,put);
    40             }
    41 42         }
    43     }

    Reduce 端 ,使用 TableReducer:

     1 public class HbaseWriteReducer extends TableReducer<Text,Put,ImmutableBytesWritable> {
     2  3     /**
     4      * 将 map 传过来的数据写出去
     5      * @param key
     6      * @param values
     7      * @param context
     8      * @throws IOException
     9      * @throws InterruptedException
    10      */
    11     @Override
    12     protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    13         //设置rowkey
    14         ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
    15         //设置rowkey
    16         immutableBytesWritable.set(key.toString().getBytes());
    17         for (Put value : values) {
    18             context.write(immutableBytesWritable,value);
    19         }
    20     }
    21 }
     

    启动类,将 user1 中 f1 列族下 age,name数值写入到 user2 中:

     1 public class Hbase2HbaseMR extends Configured implements Tool {
     2  3    public static void main(String[] args) throws Exception {
     4        Configuration configuration = HBaseConfiguration.create();
     5        //设置 hbase 的zk地址
     6        configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
     7        int run = ToolRunner.run(configuration, new Hbase2HbaseMR(), args);
     8        System.exit(run);
     9    }
    10     @Override
    11     public int run(String[] strings) throws Exception {
    12         Job job = Job.getInstance(super.getConf());
    13         job.setJarByClass(Hbase2HbaseMR.class);
    14         //mapper
    15         TableMapReduceUtil.initTableMapperJob(TableName.valueOf("user"),new Scan(), HBaseReadMapper.class,Text.class,Put.class,job);
    16         //reducer
    17         TableMapReduceUtil.initTableReducerJob("user2",HbaseWriteReducer.class,job);
    18         boolean b = job.waitForCompletion(true);
    19 20         return b?0:1;
    21     }
    22 }
     

    0HDFS 导入到Hbase

    思路:准备 MR 程序将 HDFS 数据写入到另一张 Hbase 表即可。

    注意:

    读入的是 Mapper 是 HDFS 操作,写出的 Reduce 是 Hbase 操作;

    HDFS 数据格式要与 Hbase 表对应

    准备工作:


    准备 HDFS 上数据 ;

    准备 user2 表,创建列族 f1,作为输出表。

    主要代码:

    Mapper 端,使用常规 Mapper

     1 public class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
     2  3         /**
     4          * HDFS -- Hbase
     5          *
     6          * @param key
     7          * @param value
     8          * @param context
     9          * @throws IOException
    10          * @throws InterruptedException
    11          */
    12         @Override
    13         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    14             //数据原样输出
    15             context.write(value,NullWritable.get());
    16         }
    17     }
     

    Reduce 端,使用 TableReducer :

     1 public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable>{
     2         @Override
     3         protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
     4             /**
     5              * key --> 一行数据
     6              * 样例数据:
     7              * 07 zhangsan 18
     8              * 08 lisi 25
     9              * 09 wangwu 20
    10              *
    11              */
    12             //按格式拆分
    13             String[] split = key.toString().split("	");
    14             //构建 put 对象
    15             Put put = new Put(Bytes.toBytes(split[0]));
    16             put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
    17             put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
    18             context.write(new ImmutableBytesWritable(split[0].getBytes()),put);
    19         }
    20     }
    21  
     

    启动类:

     1 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     2             Configuration configuration = HBaseConfiguration.create();
     3             //设置 hbase zk 地址
     4             configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
     5             Job job = Job.getInstance(configuration);
     6             job.setJarByClass(Hdfs2HbaseMR.class);
     7             //输入文件路径
     8             FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input"));
     9             job.setMapperClass(HdfsMapper.class);
    10             job.setMapOutputKeyClass(Text.class);
    11             job.setMapOutputValueClass(NullWritable.class);
    12             //指定输出到 Hbase 的 表名
    13             TableMapReduceUtil.initTableReducerJob("user2",HBASEReducer.class,job);
    14             //设置 reduce 个数
    15             job.setNumReduceTasks(1);
    16             boolean b = job.waitForCompletion(true);
    17             System.exit(b?0:1);
    18         }
     

    0HDFS 大数据导入Hbase

    思路:与 2 中的数据导入不同的是这次的数据量比较大,使用常规的 MR 可能耗时非常的长,并且一直占用资源。

    我们可以先将 Hadoop 上存储的 HDFS 文件转换成 HFile 文件,HFile 文件就是 Hbase 底层存储的类型,转换完成后,再将转换好的 HFile 文件指定给对应的 Hbase 表即可。这就是 bulkload 的方式批量加载数据,大致流程如下:

     

    注意:

    由于是文件类型转换,不做计算操作,所以只需要读入的 Mapper 操作,,不需要Reduce操作;

    文件类型转换后 还需要做 Hbase 表与 HFile 文件的映射

    准备工作:


    准备 HDFS 上数据 ;

    准备 user2 表,创建列族 f1,作为输出表。

    主要代码:

    Mapper 端,使用常规 Mapper

     1 public class Hdfs2HFileMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {
     2     @Override
     3     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     4         String[] split = value.toString().split("	");
     5         //封装输出类型
     6         Put put = new Put(split[0].getBytes());
     7         put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
     8         put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
     9         // 将封装好的put对象输出,rowkey 使用 immutableBytesWritable
    10         context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
    11     }
    12 }

    启动类:

     1 /**
     2  *
     3  * 将HDFS文件写成Hfile格式输出
     4  */
     5 public class Hdfs2HileOut extends Configured implements Tool {
     6  7     public static void main(String[] args) throws Exception {
     8         Configuration configuration = HBaseConfiguration.create();
     9         configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
    10         int run = ToolRunner.run(configuration, new Hdfs2HileOut(), args);
    11         System.exit(run);
    12     }
    13     @Override
    14     public int run(String[] strings) throws Exception {
    15         Configuration conf = super.getConf();
    16         Job job = Job.getInstance(conf);
    17         job.setJarByClass(Hdfs2HileOut.class);
    18         FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input"));
    19 20         job.setMapperClass(Hdfs2HFileMapper.class);
    21         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    22         job.setMapOutputValueClass(Put.class);
    23         Connection connection = ConnectionFactory.createConnection(conf);
    24         Table table = connection.getTable(TableName.valueOf("user2"));
    25         //使MR可以向user2表中,增量增加数据
    26         HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("user2")));
    27         //数据写回到HDFS 写成HFILE -》 所以指定输出格式为Hfile
    28         job.setOutputFormatClass(HFileOutputFormat2.class);
    29         //HFile 输出的路径,用于与表映射的输入参数
    30         HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop102:9000/hbase/out_hfile2"));
    31         //开始执行
    32         boolean b = job.waitForCompletion(true);
    33         return b? 0: 1;
    34     }
    35 }
     

    加载类:

    public class LoadHFile2Hbase {
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
            //获取数据库连接
            Connection connection = ConnectionFactory.createConnection(configuration);
            Table table = connection.getTable(TableName.valueOf("user2"));
            //构建 LoadIncrementalHfiles 加载 Hfile文件
            LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(configuration);
            // 加载上一步输出的HFile 与表做映射
            loadIncrementalHFiles.doBulkLoad(new Path("hdfs://hadoop102:9000/hbase/out_hfile2"),connection.getAdmin(),table,connection.getRegionLocator(TableName.valueOf("user2")));
        }
    }
     

    至此,HDFS 数据迁移至 Hbase 完成。


    2   Hbase 与 Hive 集成

    hbase 与 hive 相关的数据迁移工作分为两种:

    • hive 表结果 ---> hbase 表

    • hbase 表数据 --->  hive 表

    这部分操作没有代码,在 hive 和 hbase 客户端就能完成操作

    01 准备工作

    1 首先需要将 Hbase下的5个包拷贝到 hive lib 下,建议使用软连接的形式:

    ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar  /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar   
    ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar  /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar
    ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar       /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar          
    ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar       /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar   
    ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar        /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar

    2 修改 Hive 的配置文件 hive-site.xml 添加自己的 zk 信息:

    <property>
        <name>hive.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
      </property>
      <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
      </property>
     

    3 修改 Hive 的配置文件 hive-env.sh 添加如下信息:

    export HADOOP_HOME=/kkb/install/hadoop-2.6.0-cdh5.14.2/
    export HBASE_HOME=/kkb/install/servers/hbase-1.2.0-cdh5.14.2
    export HIVE_CONF_DIR=/kkb/install/hive-1.1.0-cdh5.14.2/conf
     

    至此 准备工作完成。

    02 hive表导入hbase

    hive 中创建管理表(内部表)与hbase 表完成映射则hive管理表的数据会添加到 hbase 表中 ,命令如下:

    create table course.hbase_score(id int,cname string,score int) 
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
    with serdeproperties("hbase.columns.mapping" = "cf:name,cf:score") 
    tblproperties("hbase.table.name" = "hbase_score");
     

    从命令中可以看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 则值的对应列族下的字段,而 hive 表的 id 则会作为hbase表的 rowkey 进行存储。

    通过向内部表插入数据即可完成数据查询结果的导入。

    insert overwrite table course.hbase_score select id,cname,score from course.score;

    最后查看 hbase 表即可看到数据。

    03 hbase表导入hive 

    hbase 结果映射到 hive表比较简单,创建 hive 外部表即可:

    CREATE external TABLE hbase2hive(id int, name string, score int) 
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name,cf:score") 
    TBLPROPERTIES("hbase.table.name" ="hbase_hive_score");
     

    从命令中可以看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 的值则对应hive表的字段,而 hive 表的 id 则会作取 hbase表的 rowkey 进行存储。

    至此,Hbase 与 Hive 的数据迁移就完成了。


    3  Hbase 协处理器和基础 api

    关于基础api这部分比较详细的介绍就在代码中了,再此我们就简单说一下Hbase 协处理器。

    协处理器是为了解决Hbase早期版本的一些问题,如建立二次索引、复杂过滤器、求和计数分组计数等类sql操作以及访问控制等。

    Hbase 提供两类协处理器:

    • observer 类似数据库的触发器,个人理解类似拦截器的功能;

    • endpoint 类似数据库的存储过程,可以实现类sql的统计操作。

    协处理器的加载方式

    01 静态加载实现

    通过修改 hbase-site.xml 这个文件来实现, 如启动全局 aggregation,能过操纵所有的表数据。只需要在hbase-site.xml里面添加以下配置即可,修改完配置之后需要重启HBase集群。

    <property>
      <name>hbase.coprocessor.user.region.classes</name>
      <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
    </property>

    为所有table加载了一个 cp class,可以用” ,”分割加载多个 class。

    02 动态加载实现

    启用表aggregation,只对特定的表生效。

    下面以协处理器 observer 为例来简单说下操作过程:

    1 创建 两张 hbase 表,user1 ,user2:

    create 'user1','info;
    create 'user2','info';

    2 协处理器代码开发,完成往 user1 表插入数据时,先往 user2 表插入数据,代码如下:

     1 public class MyProcessor extends BaseRegionObserver {
     2     @Override
     3     public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
     4 //获取连接
     5         Configuration configuration = HBaseConfiguration.create();
     6         configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181:hadoop104:2181");
     7         Connection connection = ConnectionFactory.createConnection(configuration);
     8 //涉及多个版本得问题
     9         List<Cell> cells = put.get("info".getBytes(), "name".getBytes());
    10 //将user1表的name 数据也插入到 user2 中
    11         Cell nameCell = cells.get(0);
    12         Put put1 = new Put(put.getRow());
    13         put1.add(nameCell);
    14         Table table = connection.getTable(TableName.valueOf("user2"));
    15         table.put(put1);
    16         table.close();
    17         connection.close();
    18     }
    19 } 
    
    

    3 将开发好的项目打包上传到 HDFS ,路径自定,假设是:

    hdfs://hadoop102:9000/processor/processor.jar
    
    

    4 将 jar 包挂载到 user1 表:

    disable 'user1';
    alter 'user1',METHOD => 'table_att','Coprocessor'=>'hdfs://hadoop102:9000/processor/processor.jar|com.bigdata.comprocessor.MyProcessor|1001|';
    enabled 'user1';
     

    com.bigdata.comprocessor.MyProcessor : 你程序的全类名;

    1001 :协处理器编号,自定义即可,表中协处理器的编号不能重复。

    5 测试向 user1 中插入数据,user2 是否有数据:

     1 public class TestObserver {
     2  3     @Test
     4     public void testPut() throws IOException {
     5  6         //获取连接
     7         Configuration configuration = HBaseConfiguration.create();
     8         configuration.set("hbase.zookeeper.quorum", "hadoop102:2181,hadoop103:2181,hadoop104:2181");
     9         //创建连接对象
    10         Connection connection = ConnectionFactory.createConnection(configuration);
    11         Table proc1 = connection.getTable(TableName.valueOf("user1"));
    12         Put put = new Put("1110001112".getBytes());
    13 14         put.addColumn("info".getBytes(),"name".getBytes(),"hello".getBytes());
    15         put.addColumn("info".getBytes(),"gender".getBytes(),"male".getBytes());
    16         put.addColumn("info".getBytes(),"nationality".getBytes(),"test".getBytes());
    17         proc1.put(put);
    18         proc1.close();
    19         connection.close();
    20         System.out.println("success");
    21 22     }
    23 }

    关于协处理器卸载:

    disable 'user1'
    alter 'user1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
    enable 'user1'

    协处理器 observer 大致开发流程就是这样的。关于基础 api 放在参考资料的项目中了。

    至此,还留有一个问题就是 hbase 的 endpoint 协处理器,其实它解决的问题及时实现 min、 max、 avg、 sum、 distinct、 group by 等sql功能,这个问题我们放在下期,下期介绍一个基于 hbase 框架之上的框架 -- phoenix,Phoenix之于 Hbase ,就像 hive 之于 Hadoop,会完美的实现 hbase 的 sql 查询操作。

    项目代码地址: https://github.com/fanpengyi/hbase-api  

    -- THE END --

  • 相关阅读:
    自动控制基础MATLAB 2
    钽电容和瓷片电容的对比
    自动控制原理基础 matlab 1
    Altium使用总结1
    Altium 各个层的作用
    C语言联合体的灵活运用
    windows清除日志
    Lingo 优化实例 出版社问题
    Lingo 0-1规划
    用CMD分类
  • 原文地址:https://www.cnblogs.com/fanyi0922/p/12629828.html
Copyright © 2011-2022 走看看