zoukankan      html  css  js  c++  java
  • HBase 写优化之 BulkLoad 实现数据快速入库

    在第一次建立Hbase表的时候,我们可能需要往里面一次性导入大量的初始化数据。我们很自然地想到将数据一条条插入到Hbase中,或者通过MR方式等。但是这些方式不是慢就是在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据。本文将针对这个问题介绍如何通过Hbase的BulkLoad方法来快速将海量数据导入到Hbase中。

      总的来说,使用 Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase 的内部 HFile 格式文件,用来形成一个特殊的 HBase 数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

    实现原理

      Bulkload过程主要包括三部分:

      1、从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS。抽取数据到HDFS和Hbase并没有关系,所以大家可以选用自己擅长的方式进行,本文就不介绍了。

      2、利用MapReduce作业处理实现准备的数据 。这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。该作业需要使用rowkey(行键)作为输出Key;KeyValue、Put或者Delete作为输出Value。MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用HadoopTotalOrderPartitioner类根据表的key值将输出分割开来。HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。

      3、告诉RegionServers数据的位置并导入数据。这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。

    整个过程图如下:

     

    代码实现

    上面我们已经介绍了Hbase的BulkLoad方法的原理,我们需要写个Mapper和驱动程序,实现如下:

    使用MapReduce生成HFile文件

    public class IteblogBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] items = line.split(" ");
      
                ImmutableBytesWritable rowKey = new ImmutableBytesWritable(items[0].getBytes());
                Put put = new Put(Bytes.toBytes(items[0]));   //ROWKEY
                put.addColumn("f1".getBytes(), "url".getBytes(), items[1].getBytes());
                put.addColumn("f1".getBytes(), "name".getBytes(), items[2].getBytes());
                
                context.write(rowkey, put);
            }
    }
     

    驱动程序

    public class IteblogBulkLoadDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            final String SRC_PATH= "hdfs://iteblog:9000/user/iteblog/input";
            final String DESC_PATH= "hdfs://iteblog:9000/user/iteblog/output";
            Configuration conf = HBaseConfiguration.create();
           
            Job job=Job.getInstance(conf);
            job.setJarByClass(IteblogBulkLoadDriver.class);
            job.setMapperClass(IteblogBulkLoadMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            job.setOutputFormatClass(HFileOutputFormat2.class);
            HTable table = new HTable(conf,"blog_info");
            HFileOutputFormat2.configureIncrementalLoad(job,table,table.getRegionLocator());
            FileInputFormat.addInputPath(job,new Path(SRC_PATH));
            FileOutputFormat.setOutputPath(job,new Path(DESC_PATH));
              
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
     

    通过BlukLoad方式加载HFile文件

    public class LoadIncrementalHFileToHBase {
        public static void main(String[] args) throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection connection = ConnectionFactory.createConnection(configuration);
            LoadIncrementalHFiles loder = new LoadIncrementalHFiles(configuration);
            loder.doBulkLoad(new Path("hdfs://iteblog:9000/user/iteblog/output"),new HTable(conf,"blog_info"));
        }
    }
     

    由于Hbase的BulkLoad方式是绕过了Write to WAL,Write to MemStore及Flush to disk的过程,所以并不能通过WAL来进行一些复制数据的操作。后面我将会再介绍如何通过Spark来使用Hbase的BulkLoad方式来初始化数据。

    BulkLoad的使用案例

      1、首次将原始数据集载入 HBase- 您的初始数据集可能很大,绕过 HBase 写入路径可以显著加速此进程。
      2、递增负载 - 要定期加载新数据,请使用 BulkLoad 并按照自己的理想时间间隔分批次导入数据。这可以缓解延迟问题,并且有助于您实现服务级别协议 (SLA)。但是,压缩触发器就是 RegionServer 上的 HFile 数目。因此,频繁导入大量 HFile 可能会导致更频繁地发生大型压缩,从而对性能产生负面影响。您可以通过以下方法缓解此问题:调整压缩设置,确保不触发压缩即可存在的最大 HFile 文件数很高,并依赖于其他因素,如 Memstore 的大小 触发压缩。
      3、数据需要源于其他位置 - 如果当前系统捕获了您想在 HBase 中包含的数据,且因业务原因需要保持活动状态,您可从系统中将数据定期批量加载到 HBase 中,以便可以在不影响系统的前提下对其执行操作。

    生成HFile程序说明:

    ①. 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。

    ②. 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。

    ③. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。好像最新的版本可以多个列族.

    ④. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置。TotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。

    ⑤. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。

          说明: 因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split, 所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先创建分区,可以查阅相关资料(关于HBase调优的资料), 而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey, 即startKey与endKey.

    3、说明与注意事项:

    (1)HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作。

    (2)最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
    否则报这样的错误:

    java.lang.IllegalArgumentException: Can't read partitions file
    ...
    Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

    (3)最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer,这个 SorterReducer 可以不指定,因为源码中已经做了判断:

    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
    	job.setReducerClass(KeyValueSortReducer.class);
    } else if (Put.class.equals(job.getMapOutputValueClass())) {
    	job.setReducerClass(PutSortReducer.class);
    } else {
    	LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
    }

    (4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件,多列簇需要起多个 job,不过新版本的 Hbase 已经解决了这个限制。

    (5) MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。

    (6)最后一个 Reduce 没有 setNumReduceTasks 是因为,该设置由框架根据region个数自动配置的。

    (7)下边配置部分,注释掉的其实写不写都无所谓,因为看源码就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的部分才需要手动配置。

    public class HFileOutput {
            //job 配置
    	public static Job configureJob(Configuration conf) throws IOException {
    		Job job = new Job(configuration, "countUnite1");
    		job.setJarByClass(HFileOutput.class);
                    //job.setNumReduceTasks(2);  
    		//job.setOutputKeyClass(ImmutableBytesWritable.class);
    		//job.setOutputValueClass(KeyValue.class);
    		//job.setOutputFormatClass(HFileOutputFormat.class);
     
    		Scan scan = new Scan();
    		scan.setCaching(10);
    		scan.addFamily(INPUT_FAMILY);
    		TableMapReduceUtil.initTableMapperJob(inputTable, scan,
    				HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
    		//这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class
                    job.setReducerClass(HFileOutputRedcuer.class);
    		//job.setOutputFormatClass(HFileOutputFormat.class);
    		HFileOutputFormat.configureIncrementalLoad(job, new HTable(
    				configuration, outputTable));
    		HFileOutputFormat.setOutputPath(job, new Path());
                    //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
    		return job;
    	}
     
    	public static class HFileOutputMapper extends
    			TableMapper<ImmutableBytesWritable, LongWritable> {
    		public void map(ImmutableBytesWritable key, Result values,
    				Context context) throws IOException, InterruptedException {
    			//mapper逻辑部分
    			context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
    		}
    	}
     
    	public static class HFileOutputRedcuer extends
    			Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
    		public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
    				Context context) throws IOException, InterruptedException {
                            //reducer逻辑部分
    			KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
    					Bytes.toBytes(count));
    			context.write(key, kv);
    		}
    	}
    }

    4、Refer:

    1、Hbase几种数据入库(load)方式比较

    http://blog.csdn.net/kirayuan/article/details/6371635

    2、MapReduce生成HFile入库到HBase及源码分析

    http://blog.pureisle.net/archives/1950.html

    3、MapReduce生成HFile入库到HBase

    http://shitouer.cn/2013/02/hbase-hfile-bulk-load/

  • 相关阅读:
    oracle a:=100 和 b=:c 区别
    Oracle为表或字段添加备注
    oracle删除表字段和oracle表增加字段
    oracle数据库的一个表中,怎么设置字段的默认值
    VS2015密钥
    C# 调用WebApi
    OCX ClassId查看
    C++ 调用类的函数
    如何做一个标记为安全的ACTIVEX控件
    Java内存通道
  • 原文地址:https://www.cnblogs.com/houji/p/7382996.html
Copyright © 2011-2022 走看看