zoukankan      html  css  js  c++  java
  • Bulk Load-HBase数据导入最佳实践

    一、概述

    HBase本身提供了非常多种数据导入的方式,通常有两种经常使用方式:

    1、使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase

    2、还有一种方式就是使用HBase原生Client API

    这两种方式因为须要频繁的与数据所存储的RegionServer通信。一次性入库大量数据时,特别占用资源,所以都不是最有效的。了解过HBase底层原理的应该都知道,HBase在HDFS中是以HFile文件结构存储的,一个比較高效便捷的方法就是使用 “Bulk Loading”方法直接生成HFile,即HBase提供的HFileOutputFormat类。


    二、Bulk Load基本原理

    Bulk Load处理由两个主要步骤组成

    1、准备数据文件

    Bulk Load的第一步。会执行一个Mapreduce作业,当中使用到了HFileOutputFormat输出HBase数据文件:StoreFile。

    HFileOutputFormat的作用在于使得输出的HFile文件能够适应单个region。使用TotalOrderPartitioner类将map输出结果分区到各个不同的key区间中,每一个key区间都相应着HBase表的region。

    2、导入HBase表

    第二步使用completebulkload工具将第一步的结果文件依次交给负责文件相应region的RegionServer,并将文件move到region在HDFS上的存储文件夹中。一旦完毕。将数据开放给clients。

    假设在bulk load准备导入或在准备导入与完毕导入的临界点上发现region的边界已经改变,completebulkload工具会自己主动split数据文件到新的边界上。可是这个过程并非最佳实践,所以用户在使用时须要最小化准备导入与导入集群间的延时,特别是当其它client在同一时候使用其它工具向同一张表导入数据。


    注意:

    bulk load的completebulkload步骤。就是简单的将importtsv或HFileOutputFormat的结果文件导入到某张表中。使用类似下面命令

    hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable

    命令会非常快运行完毕。将/user/todd/myoutput下的HFile文件导入到mytable表中。注意:假设目标表不存在。工具会自己主动创建表。


    三、生成HFile程序说明:

    1、终于输出结果。不管是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
    2、终于输出部分,Value类型是KeyValue 或Put。相应的Sorter各自是KeyValueSortReducer或PutSortReducer。
    3、MR样例中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat仅仅适合一次对单列族组织成HFile文件。


    4、MR样例中HFileOutputFormat.configureIncrementalLoad(job, table);自己主动对job进行配置。SimpleTotalOrderPartitioner是须要先对key进行总体排序,然后划分到每个reduce中,保证每个reducer中的的key最小最大值区间范围,是不会有交集的。由于入库到HBase的时候,作为一个总体的Region,key是绝对有序的。
    5、MR样例中最后生成HFile存储在HDFS上。输出路径下的子文件夹是各个列族。假设对HFile进行入库HBase。相当于move HFile到HBase的Region中。HFile子文件夹的列族内容没有了。


    四、演示样例

    1、创建表

    create 'hfiletable','fm1','fm2'

    2、准备原始数据

    key1	fm1:col1	value1
    key1	fm1:col2	value2
    key1	fm2:col1	value3
    key4	fm1:col1	value4

    3、导入HBase MR

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FsShell;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class BulkLoadJob {
        static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);
    
        public static class BulkLoadMap extends
                Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
    
                String[] valueStrSplit = value.toString().split("	");
                String hkey = valueStrSplit[0];
                String family = valueStrSplit[1].split(":")[0];
                String column = valueStrSplit[1].split(":")[1];
                String hvalue = valueStrSplit[2];
                final byte[] rowKey = Bytes.toBytes(hkey);
                final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
                Put HPut = new Put(rowKey);
                byte[] cell = Bytes.toBytes(hvalue);
                HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell);
                context.write(HKey, HPut);
    
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = HBaseConfiguration.create();
            String inputPath = args[0];
            String outputPath = args[1];
            HTable hTable = null;
            try {
                Job job = Job.getInstance(conf, "ExampleRead");
                job.setJarByClass(BulkLoadJob.class);
                job.setMapperClass(BulkLoadJob.BulkLoadMap.class);
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                job.setMapOutputValueClass(Put.class);
                // speculation
                job.setSpeculativeExecution(false);
                job.setReduceSpeculativeExecution(false);
                // in/out format
    			job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(HFileOutputFormat2.class);
    
                FileInputFormat.setInputPaths(job, inputPath);
                FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
                hTable = new HTable(conf, args[2]);
                HFileOutputFormat2.configureIncrementalLoad(job, hTable);
    
                if (job.waitForCompletion(true)) {
                    FsShell shell = new FsShell(conf);
                    try {
                        shell.run(new String[]{"-chmod", "-R", "777", args[1]});
                    } catch (Exception e) {
                        logger.error("Couldnt change the file permissions ", e);
                        throw new IOException(e);
                    }
                    //载入到hbase表
                    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                    loader.doBulkLoad(new Path(outputPath), hTable);
                } else {
                    logger.error("loading failed.");
                    System.exit(1);
                }
    
            } catch (IllegalArgumentException e) {
                e.printStackTrace();
            } finally {
                if (hTable != null) {
                    hTable.close();
                }
            }
        }
    }

    4、查看数据

    hbase(main):003:0> scan 'hfiletable'
    ROW                                                   COLUMN+CELL                                                                                                                                                  
     key2                                                 column=fm1:col1, timestamp=1437794332921, value=value1                                                                                                       
     key2                                                 column=fm1:col2, timestamp=1437794332921, value=value2                                                                                                       
     key2                                                 column=fm2:col1, timestamp=1437794332921, value=value3                                                                                                       
     key3                                                 column=fm1:col1, timestamp=1437794332921, value=value4                                                                                                       
    2 row(s) in 0.1910 seconds
    

    五、总结

    尽管importtsv工具使用与大多数场景,用户有时希望自己编程生成数据,或以其它格式导入数据,比方importtsv须要在导入前确定每条数据column维度,一旦我们的数据的维度是依据数据内容本身的。importtsv就无法满足需求。这时就须要对工具改造。能够查看ImportTsv.java和HFileOutputFormat的javaDoc。

    completebulkload相同能够编程化实现,能够查看LoadIncrementalHFiles类。







  • 相关阅读:
    WinCE数据库开发时整出来的一个致命的BUG
    WinCE下读取注册表获得SD卡路径
    Cookie的创建、读写和删除
    Http Module 介绍[转]
    贴几个从Dnt论坛代码里边扣出来的函数
    SqlServer判断数据库、表、存储过程、函数是否存在
    WinCE中C#WinForm利用Web Service查询数据库
    关于WinCE和PC中同一字符串的GetHashCode()结果不同的理解
    DataTable中数据记录的统计
    C#读取XML时自动过滤掉注释部分
  • 原文地址:https://www.cnblogs.com/yutingliuyl/p/6718502.html
Copyright © 2011-2022 走看看