zoukankan      html  css  js  c++  java
  • ImportTsv-HBase数据导入工具

    一、概述

    HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv。关于Bulk load大家可以看下我另一篇博文

    通常HBase用户会使用HBase API导数,但是如果一次性导入大批量数据,可能占用大量Regionserver资源,影响存储在该Regionserver上其他表的查询,本文将会从源码上解析ImportTsv数据导入工具,探究如何高效导入数据到HBase。


    二、ImportTsv介绍

    ImportTsv是Hbase提供的一个命令行工具,可以将存储在HDFS上的自定义分隔符(默认 )的数据文件,通过一条命令方便的导入到HBase表中,对于大数据量导入非常实用,其中包含两种方式将数据导入到HBase表中:

    第一种是使用TableOutputformat在reduce中插入数据;

    第二种是先生成HFile格式的文件,再执行一个叫做CompleteBulkLoad的命令,将文件move到HBase表空间目录下,同时提供给client查询。


    三、源码解析

    本文基于CDH5 HBase0.98.1,ImportTsv的入口类是org.apache.hadoop.hbase.mapreduce.ImportTsv

    1. String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);  
    2. String columns[] = conf.getStrings(COLUMNS_CONF_KEY);  
    3. if (hfileOutPath != null) {  
    4.   if (!admin.tableExists(tableName)) {  
    5.     LOG.warn(format("Table '%s' does not exist.", tableName));  
    6.     // TODO: this is backwards. Instead of depending on the existence of a table,  
    7.     // create a sane splits file for HFileOutputFormat based on data sampling.  
    8.     createTable(admin, tableName, columns);  
    9.   }  
    10.   HTable table = new HTable(conf, tableName);  
    11.   job.setReducerClass(PutSortReducer.class);  
    12.   Path outputDir = new Path(hfileOutPath);  
    13.   FileOutputFormat.setOutputPath(job, outputDir);  
    14.   job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
    15.   if (mapperClass.equals(TsvImporterTextMapper.class)) {  
    16.     job.setMapOutputValueClass(Text.class);  
    17.     job.setReducerClass(TextSortReducer.class);  
    18.   } else {  
    19.     job.setMapOutputValueClass(Put.class);  
    20.     job.setCombinerClass(PutCombiner.class);  
    21.   }  
    22.   HFileOutputFormat.configureIncrementalLoad(job, table);  
    23. else {  
    24.   if (mapperClass.equals(TsvImporterTextMapper.class)) {  
    25.     usage(TsvImporterTextMapper.class.toString()  
    26.         + " should not be used for non bulkloading case. use "  
    27.         + TsvImporterMapper.class.toString()  
    28.         + " or custom mapper whose value type is Put.");  
    29.     System.exit(-1);  
    30.   }  
    31.   // No reducers. Just write straight to table. Call initTableReducerJob  
    32.   // to set up the TableOutputFormat.  
    33.   TableMapReduceUtil.initTableReducerJob(tableName, null, job);  
    34.   job.setNumReduceTasks(0);  
    35. }  

    从ImportTsv.createSubmittableJob方法中判断参数BULK_OUTPUT_CONF_KEY开始,这步直接影响ImportTsv的Mapreduce作业最终以哪种方式入HBase库

    如果不为空并且用户没有自定义Mapper实现类(参数importtsv.mapper.class)时,则使用PutSortReducer,其中会对Put排序,如果每行记录有很多column,则会占用Reducer大量的内存资源进行排序。

    1. Configuration conf = job.getConfiguration();  
    2. HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));  
    3. job.setOutputFormatClass(TableOutputFormat.class);  

    如果为空,调用TableMapReduceUtil.initTableReducerJob初始化TableOutputformat的Reducer输出,此方式不需要使用Reducer,因为直接在mapper的Outputformat中会批量的调用Put API将数据提交到Regionserver上(相当于并行的执行HBase Put API)


    四、实战

    1、使用TableOutputformat的Put API上传数据,非bulk-loading

    1. $ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <hdfs-inputdir>  

    2、使用bulk-loading生成StoreFiles(HFile)

    step1、生成Hfile

    1. $ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>  

    step2、完成导入

    1. $ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>  


    五、总结

    在使用ImportTsv时,一定要注意参数importtsv.bulk.output的配置,通常来说使用Bulk output的方式对Regionserver来说更加友好一些,这种方式加载数据几乎不占用Regionserver的计算资源,因为只是在HDFS上移动了HFile文件,然后通知HMaster将该Regionserver的一个或多个region上线。

    ----------------------------------------------

    1.hbase到hdfs
    hbase org.apache.hadoop.hbase.mapreduce.Export "ns:core_tran_info" /hbase2hdfs
    注:hdfs文件系统路径/hbase2hdfs不要存在,不然报错

    2.hdfs到hbase
    hbase org.apache.hadoop.hbase.mapreduce.Driver import 'hdfs2hbase'   /hbase2hdfs/*  
    注:hdfs2hbase为hbase中已存在的表,且表hdfs2hbase的column family要和hdfs文件路下导出的hbase表的列族相同。



    命令说明importtsv
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>

    导入tsv文件
    第一种:
    step1:根据hdfs中的文件生成Hfile
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,cf:a,cf:b -Dimporttsv.bulk.output=hdfs:///storefile tsvtab /hdfs2hbase/hdfs2hbase.tsv
    注:
    (1)tsv文件不能有文件头;
    (2)importtsv.columns=HBASE_ROW_KEY必须要指定列,且必须要有HBASE_ROW_KEY
    (3)Dimporttsv.bulk.output=hdfs:///storefile,hdfs上的storefile目录开始不能存在
    step2:完成导入
    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs:///storefile tsvtab


    第二种:
    HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-1.3.1.jar importtsv   -Dimporttsv.columns=HBASE_ROW_KEY,baseinfo:INTERNAL_KEY,baseinfo:TRAN_DATE,baseinfo:SOURCE_TYPE,baseinfo:TERMINAL_ID,baseinfo:BRANCH,baseinfo:OFFICER_ID,baseinfo:TRAN_TYPE,baseinfo:EFFECT_DATE,baseinfo:POST_DATE,baseinfo:REVERSAL_TRAN_TYPE,baseinfo:REVERSAL_DATE,baseinfo:STMT_DATE,baseinfo:TRAN_AMT,baseinfo:REFERENCE,baseinfo:PREVIOUS_BAL_AMT,baseinfo:ACTUAL_BAL_AMT,baseinfo:TFR_INTERNAL_KEY,baseinfo:TFR_SEQ_NO, dshuangfu:teacher1 /user/hac/input/2-1/dshuangfu_hbase_zhongyuan111.tsv

    导入csv文件
    step1:根据hdfs中的文件生成Hfile
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf:a -Dimporttsv.bulk.output=hdfs:///storefile/csvtab csvtab /hdfs2hbase/hdfs2hbase.csv
    注:
    (1)tsv文件不能有文件头;
    (2)importtsv.columns=HBASE_ROW_KEY必须要指定列,且必须要有HBASE_ROW_KEY
    (3)Dimporttsv.bulk.output=hdfs:///storefile,hdfs上的storefile目录开始不能存在
    (4)
    step2:完成导入
    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs:///storefile/csvtab csvtab

  • 相关阅读:
    Redis 安全
    Redis 数据备份与恢复
    Redis 服务器
    Redis 连接
    Redis 脚本
    Linux中使用netstat命令的基本操作,排查端口号的占用情况
    ElasticSearch 常用查询语句
    GO代码风格指南 Uber Go (转载)
    coding 注意事项(总结中)
    Byte字节
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/7778294.html
Copyright © 2011-2022 走看看