zoukankan      html  css  js  c++  java
  • Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

    转自:http://blog.csdn.net/zhongwen7710/article/details/39577431

    本blog的内容包含:
    第一部分:Hbase框架原理理解
    第二部分:Hbase调用MapReduce函数使用理解
    第三部分:Hbase调用Java API使用理解
    第四部分:Hbase Shell操作
    第五部分:Hbase建表、读写操作方式性能优化总结
     

    第一部分:Hbase框架原理理解
     
    • 概述



    HBase是一个构建在HDFS上的分布式列存储系统;
    HBase是基于Google BigTable模型开发的,典型的key/value系统;
    HBase是Apache Hadoop生态系统中的重要一员,主要用于海量结构化数据存储;
    从逻辑上讲,HBase将数据按照表、行和列进行存储。
    与hadoop一样,Hbase目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。
    Hbase表的特点
    大:一个表可以有数十亿行,上百万列;
    无模式:每行都有一个可排序的主键和任意多的列,列可以根据需要动态的增加,同一张表中不同的行可以有截然不同的列;
    面向列:面向列(族)的存储和权限控制,列(族)独立检索;
    稀疏:空(null)列并不占用存储空间,表可以设计的非常稀疏;
    数据多版本:每个单元中的数据可以有多个版本,默认情况下版本号自动分配,是单元格插入时的时间戳;
    数据类型单一:Hbase中的数据都是字符串,没有类型。

    • Hbase数据模型

    Hbase逻辑视图

    注意上图中的英文说明

    Hbase基本概念

    RowKey:是Byte array,是表中每条记录的“主键”,方便快速查找,Rowkey的设计非常重要。
    Column Family:列族,拥有一个名称(string),包含一个或者多个相关列
    Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加
    Version Number:类型为Long,默认值是系统时间戳,可由用户自定义
    Value(Cell):Byte array

    • Hbase物理模型

    每个column family存储在HDFS上的一个单独文件中,空值不会被保存。
    Key 和 Version number在每个 column family中均有一份;
    HBase 为每个值维护了多级索引,即:<key, column family, column name, timestamp>

    物理存储:
    1、Table中所有行都按照row key的字典序排列;
    2、Table在行的方向上分割为多个Region;
    3、Region按大小分割的,每个表开始只有一个region,随着数据增多,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region,之后会有越来越多的region;
    4、Region是Hbase中分布式存储和负载均衡的最小单元,不同Region分布到不同RegionServer上。

    5、Region虽然是分布式存储的最小单元,但并不是存储的最小单元。Region由一个或者多个Store组成,每个store保存一个columns family;每个Strore又由一个memStore和0至多个StoreFile组成,StoreFile包含HFile;memStore存储在内存中,StoreFile存储在HDFS上。

    • HBase架构及基本组件

    Hbase基本组件说明:

    Client

    包含访问HBase的接口,并维护cache来加快对HBase的访问,比如region的位置信息

    Master

    为Region server分配region

    负责Region server的负载均衡

    发现失效的Region server并重新分配其上的region

    管理用户对table的增删改查操作

    Region Server

    Regionserver维护region,处理对这些region的IO请求

    Regionserver负责切分在运行过程中变得过大的region

    Zookeeper作用

    通过选举,保证任何时候,集群中只有一个master,Master与RegionServers 启动时会向ZooKeeper注册

    存贮所有Region的寻址入口

    实时监控Region server的上线和下线信息。并实时通知给Master

    存储HBase的schema和table元数据

    默认情况下,HBase 管理ZooKeeper 实例,比如, 启动或者停止ZooKeeper

    Zookeeper的引入使得Master不再是单点故障

    Write-Ahead-Log(WAL)

    该机制用于数据的容错和恢复:

    每个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操作写入MemStore的同时,也会写一份数据到HLog文件中(HLog文件格式见后续),HLog文件定期会滚动出新的,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知到,HMaster首先会处理遗留的 HLog文件,将其中不同Region的Log数据进行拆分,分别放到相应region的目录下,然后再将失效的region重新分配,领取 到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复

    HBase容错性
    Master容错:Zookeeper重新选择一个新的Master
    无Master过程中,数据读取仍照常进行;
    无master过程中,region切分、负载均衡等无法进行;
    RegionServer容错:定时向Zookeeper汇报心跳,如果一旦时间内未出现心跳,Master将该RegionServer上的Region重新分配到其他RegionServer上,失效服务器上“预写”日志由主服务器进行分割并派送给新的RegionServer
    Zookeeper容错:Zookeeper是一个可靠地服务,一般配置3或5个Zookeeper实例
    Region定位流程:

    寻找RegionServer

    ZooKeeper--> -ROOT-(单Region)--> .META.--> 用户表

    -ROOT-
    表包含.META.表所在的region列表,该表只会有一个Region;

    Zookeeper中记录了-ROOT-表的location。

    .META.

    表包含所有的用户空间region列表,以及RegionServer的服务器地址。

    • Hbase使用场景

    storing large amounts  of data(100s of TBs)
    need high write throughput
    need efficient random access(key lookups) within large data sets
    need to scale gracefully with data
    for structured and semi-structured data
    don't need fullRDMS capabilities(cross row/cross table transaction, joins,etc.)

    大数据量存储,大数据量高并发操作

    需要对数据随机读写操作

    读写访问均是非常简单的操作

    • Hbase与HDFS对比

    两者都具有良好的容错性和扩展性,都可以扩展到成百上千个节点;
    HDFS适合批处理场景
    不支持数据随机查找
    不适合增量数据处理

    不支持数据更新

    • 参考文档:

    1、http://www.alidata.org/archives/1509(存储模型比较详细)

    2、http://www.searchtb.com/2011/01/understanding-hbase.html(技术框架以及存储模型)

    3、http://wenku.baidu.com/view/b46eadd228ea81c758f578f4.html(读和写的流程比较详细)

     
     

    第二部分:Hbase调用MapReduce函数使用理解

    概述:

    Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。


    一个简单示例:

    说明:从日志表中,统计每个IP访问网站目录的总数

    [java] view plain copy
     
    1. package man.ludq.hbase;  
    2.   
    3. import java.io.IOException;  
    4.   
    5. import org.apache.hadoop.conf.Configuration;  
    6. import org.apache.hadoop.hbase.HBaseConfiguration;  
    7. import org.apache.hadoop.hbase.client.Put;  
    8. import org.apache.hadoop.hbase.client.Result;  
    9. import org.apache.hadoop.hbase.client.Scan;  
    10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
    11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
    12. import org.apache.hadoop.hbase.mapreduce.TableMapper;  
    13. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
    14. import org.apache.hadoop.hbase.util.Bytes;  
    15. import org.apache.hadoop.io.IntWritable;  
    16. import org.apache.hadoop.io.Text;  
    17. import org.apache.hadoop.mapreduce.Job;  
    18.   
    19. public class ExampleTotalMapReduce{  
    20.     public static void main(String[] args) {  
    21.         try{  
    22.             Configuration config = HBaseConfiguration.create();  
    23.             Job job = new Job(config,"ExampleSummary");  
    24.             job.setJarByClass(ExampleTotalMapReduce.class);     // class that contains mapper and reducer  
    25.   
    26.             Scan scan = new Scan();  
    27.             scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs  
    28.             scan.setCacheBlocks(false);  // don't set to true for MR jobs  
    29.             // set other scan attrs  
    30.             //scan.addColumn(family, qualifier);  
    31.             TableMapReduceUtil.initTableMapperJob(  
    32.                     "access-log",        // input table  
    33.                     scan,               // Scan instance to control CF and attribute selection  
    34.                     MyMapper.class,     // mapper class  
    35.                     Text.class,         // mapper output key  
    36.                     IntWritable.class,  // mapper output value  
    37.                     job);  
    38.             TableMapReduceUtil.initTableReducerJob(  
    39.                     "total-access",        // output table  
    40.                     MyTableReducer.class,    // reducer class  
    41.                     job);  
    42.             job.setNumReduceTasks(1);   // at least one, adjust as required  
    43.   
    44.             boolean b = job.waitForCompletion(true);  
    45.             if (!b) {  
    46.                 throw new IOException("error with job!");  
    47.             }   
    48.         } catch(Exception e){  
    49.             e.printStackTrace();  
    50.         }  
    51.     }  
    52.   
    53.     public static class MyMapper extends TableMapper<Text, IntWritable>  {  
    54.   
    55.         private final IntWritable ONE = new IntWritable(1);  
    56.         private Text text = new Text();  
    57.   
    58.         public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {  
    59.             String ip = Bytes.toString(row.get()).split("-")[0];  
    60.             String url = new String(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("url")));  
    61.             text.set(ip+"&"+url);  
    62.             context.write(text, ONE);  
    63.         }  
    64.     }  
    65.   
    66.     public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {  
    67.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
    68.             int sum = 0;  
    69.             for (IntWritable val : values) {  
    70.                 sum += val.get();  
    71.             }  
    72.   
    73.             Put put = new Put(key.getBytes());  
    74.             put.add(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));  
    75.   
    76.             context.write(null, put);  
    77.         }  
    78.     }  
    79. }  

    参考文档:

    1、Mapreduce读取和写入Hbase(从A表读取数据,统计结果放入B表,非常详细,附有代码说明以及流程)
     
    2、Mapreduce操作Hbase(官方文档,包括 读/读写/多表输出/输出到文件/输出到RDBMS/Job中访问其他的HBase Tables)
     
     

    第三部分:Hbase调用Java API使用理解
    Hbase的访问方式
    1、Native Java API:最常规和高效的访问方式;
    2、HBase Shell:HBase的命令行工具,最简单的接口,适合HBase管理使用;
    3、Thrift Gateway:利用Thrift序列化技术,支持C++,PHP,Python等多种语言,适合其他异构系统在线访问HBase表数据;
    4、REST Gateway:支持REST 风格的Http API访问HBase, 解除了语言限制;
    5、MapReduce:直接使用MapReduce作业处理Hbase数据;
    6、使用Pig/hive处理Hbase数据。

    常用Java API的用法:

    1、加载配置

    Configuration config = HBaseConfiguration.create(); 
    //可以自定义配置,也可以从自定义配置文件中读取
    /*config.set("hbase.zookeeper.property.clientPort", "4181");
    config.set("hbase.zookeeper.quorum", "hadoop.datanode5.com,hadoop.datanode2.com,hadoop.datanode3.com");
    config.set("hbase.master", "hadoop.datanode3.com\:600000");*/

    2、表的创建、表信息修改、表删除

    [java] view plain copy
     
    1. HBaseAdmin admin = new HBaseAdmin(config);  
    2. //创建表  
    3. HTableDescriptor htd = new HTableDescriptor(tableName);  
    4. htd.addFamily(new HColumnDescriptor("cf1"));  
    5. htd.addFamily(new HColumnDescriptor("cf2"));  
    6. admin.createTable(htd);  
    7. //修改表信息  
    8. admin.disableTable(tableName);  
    9. // modifying existing ColumnFamily  
    10. admin.modifyColumn(tableName, new HColumnDescriptor("cf1"));    
    11. admin.enableTable(tableName);   
    12. //删除表  
    13. admin.disableTable(Bytes.toBytes(tableName));  
    14. admin.deleteTable(Bytes.toBytes(tableName));  
    3、添加记录
    [java] view plain copy
     
    1. /** 在多次使用时,建议用HTablePool 
    2.   HTable table = new HTable(config, tableName);  
    3.   => 
    4.   HTablePool pool = new HTablePool(config, 1000); 
    5.   HTableInterface table = pool.getTable(tableName);*/  
    6. HTable table = new HTable(config, tableName);  
    7.   
    8. /** 
    9.  * 在插入操作时,默认不适用任何缓存 
    10.  * 可自定义使用缓存,以及缓存大小 
    11.  * 每个任务最后需要手工调用 flushCommits(); 
    12.  */  
    13. /*table.setAutoFlush(false); 
    14. table.setWriteBufferSize(1024);*/  
    15.   
    16. Put put1 = new Put(Bytes.toBytes(rowKey));  
    17. if (ts == 0) {  
    18.     put1.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));  
    19. else {  
    20.        //自定义版本时,从自定义的版本号,类型为long  
    21.     put1.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts,Bytes.toBytes(value));  
    22. }  
    23. table.put(put1);  
    24. //table.flushCommits();  
    4、查询,根据Rowkey查询
    Get get1 = new Get(Bytes.toBytes(rowKey));
    Result result = table.get(get1);
    System.out.println("get result:" + Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))));
    Result[] result = table.get(List<Get>);//查询指定Rowkey的多条记录

    5、查询,指定条件和rowkey区间查询
    [java] view plain copy
     
    1. Scan scan = new Scan();  
    2. //默认缓存大小为1,设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是客户端的内存  
    3. scan.setCaching(500);  
    4. scan.setCacheBlocks(false);  
    5.   
    6. //根据startRowKey、endRowKey查询  
    7. //Scan scan = new Scan(Bytes.toBytes("startRowKey"), Bytes.toBytes("endRowKey"));  
    8.   
    9. //rowKey之外的过滤条件,在List中可以add;  
    10. /**List<Filter> filters = new ArrayList<Filter>(); 
    11. Filter filter = new SingleColumnValueFilter("familyName".getBytes(),  
    12.         "qualifierName".getBytes(),  
    13.         CompareOp.EQUAL, 
    14.         Bytes.toBytes("value")); 
    15. filters.add(filter); 
    16. scan.setFilter(new FilterList(filters));*/  
    17.   
    18. ResultScanner scanner = table.getScanner(scan);  
    19.   
    20. System.out.println("scan result list:");  
    21.           
    22. for (Result result : scanner) {  
    23.     System.out.println(Bytes.toString(result.getRow()));  
    24.     System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("data1"))));  
    25.     System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("data2"))));  
    26. }  
    27. scanner.close();  
    参考:

    1、http://www.taobaotest.com/blogs/1605

    2、http://abloz.com/hbase/book.html#data_model_operations(官网示例)

    参考文献:http://blog.csdn.net/woshiwanxin102213/article/details/17676961

    第四部分:Hbase Shell操作

    Hbase的访问方式
    1、Native Java API:最常规和高效的访问方式;
    2、HBase Shell:HBase的命令行工具,最简单的接口,适合HBase管理使用;
    3、Thrift Gateway:利用Thrift序列化技术,支持C++,PHP,Python等多种语言,适合其他异构系统在线访问HBase表数据;
    4、REST Gateway:支持REST 风格的Http API访问HBase, 解除了语言限制;
    5、MapReduce:直接使用MapReduce作业处理Hbase数据;
    6、使用Pig/hive处理Hbase数据。

    常用Hbase Shell的基本用法:
    hbase shell常用的操作命令有create,describe,disable,drop,list,scan,put,get,delete,deleteall,count,status等,通过help可以看到详细的用法。
    1、打开Hbase shell
    hadoop@ubuntu:/usr$ hbase shell
    2、查询表List
    hbase(main):001:0> list
    3、建表(create)
    hbase(main):008:0> create 'scores','grad','course'
    4、添加数据(表scores,rowkey为zkb 列族grad,列名为”” 值为5)
    hbase(main):013:0> put 'scores','zkd','grade:','5'
    5、 给zkb这一行的数据的列族course添加一列<math,97> (put)
    hbase(main):016:0> put 'scores','zkd','course:math','97'
    6、查询某一条数据(get),根据rowkey查找
    hbase(main):024:0> get 'scores','zkd'
    7、查询多条数据(scan)
    格式:scan命令可以指定startrow,stoprow来scan多个row,例如:scan 'user_test',{COLUMNS =>'info:username',LIMIT =>10, STARTROW  => 'test',STOPROW=>'test2'}, {}里边的是可选项
    hbase(main):003:0> scan 'scores',{COLUMNS=>'course:art',LIMIT=>1,STARTROW=>'a',STOPROW=>'z'}
    8、删除记录(只有一个column)
    delete 'scores','1','course:art'
    9、删除rowkey的所有column
    deleteall 'scores','1'
    10、删除scores表
    hbase(main):004:0> disable 'scores'
    hbase(main):005:0> drop 'scores'

    参考:

    http://www.taobaotest.com/blogs/qa?bid=13871

    http://blog.csdn.net/woshiwanxin102213/article/details/17611457
     
     
    第五部分:Hbase建表、读写操作方式性能优化总结
     

    1、表的设计

    1.1、Column Family

    由于Hbase是一个面向列族的存储器,调优和存储都是在列族这个层次上进行的,最好使列族成员都有相同的"访问模式(access pattern)"和大小特征;
    在一张表里不要定义太多的column family。目前Hbase并不能很好的处理超过2~3个column family的表。因为某个column family在flush的时候,它邻近的column family也会因关联效应被触发flush,最终导致系统产生更多的I/O。

    1.2、Row Key

    Row Key 设计原则:
    1)Rowkey长度原则,Rowkey是一个二进制码流,可以是任意字符串,最大长度64KB,实际应用中一般为10~100bytes,存为byte[]字节数组,一般设计成定长的。建议是越短越好,不要超过16个字节。原因一数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;原因二MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。原因三目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。
    2)是Rowkey散列原则,如果Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。
    3)Rowkey唯一原则,必须在设计上保证其唯一性。
    row key是按照字典序存储,因此,设计row key时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。
    举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE – timestamp作为row key,这样能保证新写入的数据在读取时可以被快速命中。

    1.3、 In Memory

    创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到RegionServer的缓存中,保证在读取的时候被cache命中。

    1.4 、Max Version

    创建表的时候,可以通过HColumnDescriptor.setMaxVersions(intmaxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。

    1.5、 Time to Live(设置数据存储的生命周期)

    创建表的时候,可以通过HColumnDescriptor.setTimeToLive(inttimeToLive)设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置setTimeToLive(2 * 24 * 60 * 60)。

    1.6、 Compact & Split

    在HBase中,数据在更新时首先写入WAL 日志(HLog)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。于此同时, 系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了(minor compact)。
    StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(major compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行分割(split),等分为两个StoreFile。
    由于对表的更新是不断追加的,处理读请求时,需要访问Store中全部的StoreFile和MemStore,将它们按照row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,通常合并过程还是比较快的。
    实际应用中,可以考虑必要时手动进行major compact,将同一个row key的修改进行合并形成一个大的StoreFile。同时,可以将StoreFile设置大些,减少split的发生。

    1.7、 Pre-Creating Regions

    默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。                             有关预分区,详情参见:TableCreation: Pre-Creating Regions,下面是一个例子:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
     
    1. public static booleancreateTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)  
    2. throws IOException {  
    3.   try {  
    4.     admin.createTable(table, splits);  
    5.     return true;  
    6.   } catch (TableExistsException e) {  
    7.     logger.info("table " +table.getNameAsString() + " already exists");  
    8.     // the table already exists...  
    9.     return false;  
    10.   }  
    11. }  
    12.    
    13. public static byte[][]getHexSplits(String startKey, String endKey, int numRegions) {  
    14.   byte[][] splits = new byte[numRegions-1][];  
    15.   BigInteger lowestKey = newBigInteger(startKey, 16);  
    16.   BigInteger highestKey = newBigInteger(endKey, 16);  
    17.   BigInteger range =highestKey.subtract(lowestKey);  
    18.   BigInteger regionIncrement =range.divide(BigInteger.valueOf(numRegions));  
    19.   lowestKey = lowestKey.add(regionIncrement);  
    20.   for(int i=0; i < numRegions-1;i++) {  
    21.     BigInteger key =lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));  
    22.     byte[] b = String.format("%016x",key).getBytes();  
    23.     splits[i] = b;  
    24.   }  
    25.   return splits;  
    26. }  

    2、写表操作

    2.1 多HTable并发写

    创建多个HTable客户端用于写操作,提高写数据的吞吐量,一个例子:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
     
    1. static final Configurationconf = HBaseConfiguration.create();  
    2. static final Stringtable_log_name = “user_log”;  
    3. wTableLog = newHTable[tableN];  
    4. for (int i = 0; i <tableN; i++) {  
    5.     wTableLog[i] = new HTable(conf,table_log_name);  
    6.     wTableLog[i].setWriteBufferSize(5 * 1024 *1024); //5MB  
    7.     wTableLog[i].setAutoFlush(false);  
    8. }  

    2.2 HTable参数设置

    2.2.1 Auto Flush

    通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到 HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。保证最后手动HTable.flushCommits()或HTable.close()。

    2.2.2 Write Buffer

    通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置 HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其 中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。

    2.2.3 WAL Flag

    在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写 MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机 后的数据恢复。

    因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。

    值得注意的是:谨慎选择关闭WAL日志,因为这样的话,一旦RegionServer宕机,Put/Delete的数据将会无法根据WAL日志进行恢复。

    2.3 批量写

    通过调用HTable.put(Put)方法可以将一个指定的row key记录写入HBase,同样HBase提供了另一个方法:通过调用HTable.put(List<Put>)方法可以将指定的row key列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下可能带来明显的性能提升。

    2.4 多线程并发写

    在客户端开启多个HTable写线程,每个写线程负责一个HTable对象的flush操作,这样结合定时flush和写 buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被flush(如1秒内),同时又保证在数据量大的 时候,写buffer一满就及时进行flush。下面给个具体的例子:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
     
    1. for (int i = 0; i <threadN; i++) {  
    2.     Thread th = new Thread() {  
    3.         public void run() {  
    4.             while (true) {  
    5.                 try {  
    6.                     sleep(1000); //1 second  
    7.                 } catch (InterruptedExceptione) {  
    8.                     e.printStackTrace();  
    9.                 }  
    10. synchronized (wTableLog[i]) {  
    11.                     try {  
    12.                         wTableLog[i].flushCommits();  
    13.                     } catch (IOException e) {  
    14.                         e.printStackTrace();  
    15.                     }  
    16.                 }  
    17.             }  
    18. }  
    19.     };  
    20.     th.setDaemon(true);  
    21.     th.start();  
    22. }  

    3、读表操作

    3.1 多HTable并发读

    创建多个HTable客户端用于读操作,提高读数据的吞吐量,一个例子:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
     
    1. static final Configurationconf = HBaseConfiguration.create();  
    2. static final Stringtable_log_name = “user_log”;  
    3. rTableLog = newHTable[tableN];  
    4. for (int i = 0; i <tableN; i++) {  
    5.     rTableLog[i] = new HTable(conf, table_log_name);  
    6.     rTableLog[i].setScannerCaching(50);  
    7. }  

    3.2 HTable参数设置

    3.2.1 Scanner Caching

    hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是 scanner需要通过客户端的内存来维持这些被cache的行记录。

    有三个地方可以进行配置:1)在HBase的conf配置文件中进行配置;2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置;3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。

    3.2.2 Scan AttributeSelection

    scan时指定需要的Column Family,可以减少网络传输数据量,否则默认scan操作会返回整行所有Column Family的数据。

    3.2.3 Close ResultScanner

    通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。

    3.3 批量读

    通过调用HTable.get(Get)方法可以根据一个指定的row key获取一行记录,同样HBase提供了另一个方法:通过调用HTable.get(List<Get>)方法可以根据一个指定的rowkey列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高而且网络传输RTT高的情景下可能带来明显 的性能提升。

    3.4 多线程并发读

    在客户端开启多个HTable读线程,每个读线程负责通过HTable对象进行get操作。下面是一个多线程并发读取HBase,获取店铺一天内各分钟PV值的例子:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
     
    1. public class DataReaderServer{  
    2.      //获取店铺一天内各分钟PV值的入口函数  
    3.      public static ConcurrentHashMap<String,String> getUnitMinutePV(long uid, long startStamp, long endStamp){  
    4.          long min = startStamp;  
    5.          int count = (int)((endStamp -startStamp) / (60*1000));  
    6.          List<String> lst = newArrayList<String>();  
    7.          for (int i = 0; i <= count; i++) {  
    8.             min = startStamp + i * 60 * 1000;  
    9.             lst.add(uid + "_" + min);  
    10.          }  
    11.          return parallelBatchMinutePV(lst);  
    12.      }  
    13.       //多线程并发查询,获取分钟PV值  
    14. private staticConcurrentHashMap<String, String>parallelBatchMinutePV(List<String> lstKeys){  
    15.         ConcurrentHashMap<String, String>hashRet = new ConcurrentHashMap<String, String>();  
    16.         int parallel = 3;  
    17.         List<List<String>>lstBatchKeys  = null;  
    18.         if (lstKeys.size() < parallel ){  
    19.             lstBatchKeys  = new ArrayList<List<String>>(1);  
    20.             lstBatchKeys.add(lstKeys);  
    21.         }  
    22.         else{  
    23.             lstBatchKeys  = newArrayList<List<String>>(parallel);  
    24.             for(int i = 0; i < parallel;i++  ){  
    25.                 List<String> lst = newArrayList<String>();  
    26.                 lstBatchKeys.add(lst);  
    27.             }  
    28.             for(int i = 0 ; i <lstKeys.size() ; i ++ ){  
    29.                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));  
    30.             }  
    31.         }  
    32.         List<Future<ConcurrentHashMap<String, String> >> futures = newArrayList<Future< ConcurrentHashMap<String, String> >>(5);  
    33.         ThreadFactoryBuilder builder = newThreadFactoryBuilder();  
    34.        builder.setNameFormat("ParallelBatchQuery");  
    35.         ThreadFactory factory =builder.build();  
    36.         ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(),factory);  
    37.         for(List<String> keys :lstBatchKeys){  
    38.             Callable<ConcurrentHashMap<String, String> > callable = newBatchMinutePVCallable(keys);  
    39.             FutureTask<ConcurrentHashMap<String, String> > future = (FutureTask<ConcurrentHashMap<String, String> >) executor.submit(callable);  
    40.             futures.add(future);  
    41.         }  
    42.         executor.shutdown();  
    43.         // Wait for all the tasks to finish  
    44.         try {  
    45.           boolean stillRunning = !executor.awaitTermination(  
    46.               5000000, TimeUnit.MILLISECONDS);  
    47.           if (stillRunning) {  
    48.             try {  
    49.                 executor.shutdownNow();  
    50.             } catch (Exception e) {  
    51.                 // TODO Auto-generated catchblock  
    52.                 e.printStackTrace();  
    53.             }  
    54.           }  
    55.         } catch (InterruptedException e) {  
    56.           try {  
    57.              Thread.currentThread().interrupt();  
    58.           } catch (Exception e1) {  
    59.             // TODO Auto-generated catch block  
    60.             e1.printStackTrace();  
    61.           }  
    62.         }  
    63.         // Look for any exception  
    64.         for (Future f : futures) {  
    65.           try {  
    66.               if(f.get() != null)  
    67.               {  
    68.                  hashRet.putAll((ConcurrentHashMap<String, String>)f.get());  
    69.               }  
    70.           } catch (InterruptedException e) {  
    71.             try {  
    72.                 Thread.currentThread().interrupt();  
    73.             } catch (Exception e1) {  
    74.                 // TODO Auto-generated catchblock  
    75.                 e1.printStackTrace();  
    76.             }  
    77.           } catch (ExecutionException e) {  
    78.             e.printStackTrace();  
    79.           }  
    80.         }  
    81.         return hashRet;  
    82.     }  
    83.      //一个线程批量查询,获取分钟PV值  
    84.     protected staticConcurrentHashMap<String, String> getBatchMinutePV(List<String>lstKeys){  
    85.         ConcurrentHashMap<String, String>hashRet = null;  
    86.         List<Get> lstGet = newArrayList<Get>();  
    87.         String[] splitValue = null;  
    88.         for (String s : lstKeys) {  
    89.             splitValue =s.split("_");  
    90.             long uid =Long.parseLong(splitValue[0]);  
    91.             long min =Long.parseLong(splitValue[1]);  
    92.             byte[] key = new byte[16];  
    93.             Bytes.putLong(key, 0, uid);  
    94.             Bytes.putLong(key, 8, min);  
    95.             Get g = new Get(key);  
    96.             g.addFamily(fp);  
    97.             lstGet.add(g);  
    98.         }  
    99.         Result[] res = null;  
    100.         try {  
    101.             res =tableMinutePV[rand.nextInt(tableN)].get(lstGet);  
    102.         } catch (IOException e1) {  
    103.             logger.error("tableMinutePV exception,e=" + e1.getStackTrace());  
    104.         }  
    105.         if (res != null && res.length> 0) {  
    106.             hashRet = newConcurrentHashMap<String, String>(res.length);  
    107.             for (Result re : res) {  
    108.                 if (re != null &&!re.isEmpty()) {  
    109.                     try {  
    110.                         byte[] key =re.getRow();  
    111.                         byte[] value =re.getValue(fp, cp);  
    112.                         if (key != null&& value != null) {  
    113.                            hashRet.put(String.valueOf(Bytes.toLong(key,  
    114.                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes  
    115.                                    .toLong(value)));  
    116.                         }  
    117.                     } catch (Exception e2) {  
    118.                        logger.error(e2.getStackTrace());  
    119.                     }  
    120.                 }  
    121.             }  
    122.         }  
    123.         return hashRet;  
    124.     }  
    125. }  
    126. //调用接口类,实现Callable接口  
    127. class BatchMinutePVCallableimplements Callable<ConcurrentHashMap<String, String>>{  
    128.      private List<String> keys;  
    129.      publicBatchMinutePVCallable(List<String> lstKeys ) {  
    130.          this.keys = lstKeys;  
    131.      }  
    132.      public ConcurrentHashMap<String,String> call() throws Exception {  
    133.          returnDataReadServer.getBatchMinutePV(keys);  
    134.      }  
    135. }  

    3.5 缓存查询结果

    对于频繁查询HBase的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在缓存中查找,如果存在则直接返回,不再查询HBase;否则对HBase发起读请求查询,然后在应用程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑LRU等常用的策略。

    3.6 Blockcache

    HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于 BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize *hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能启动。默认BlockCache为0.2,而Memstore为0.4。对于注重读响应时间的系统,可以将 BlockCache设大些,比如设置BlockCache=0.4,Memstore=0.39,以加大缓存的命中率。

    4、参考资料

    http://blog.linezing.com/2012/03/hbase-performance-optimization(Hbase性能方法优化总结)

    http://blog.csdn.net/woshiwanxin102213/article/details/18666657
  • 相关阅读:
    【重学计算机】计组D2章:数据表示
    【重学计算机】计组D1章:计算机系统概论
    计算机底层原理杂谈(白话文)
    阿里云安装wordpress遇到的问题
    wordpress数据表结构
    家用计费系统ER图
    java 类中的属性为什么一般都是私有的
    centos 软件库安装
    linux下启动tomcat----Cannot find ./catalina.sh
    jfreechart图表汉字乱码问题解决方案
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5255329.html
Copyright © 2011-2022 走看看