zoukankan      html  css  js  c++  java
  • kudu基础入门

    一、背景介绍

    在KUDU之前,大数据主要以两种方式存储;

    (1)静态数据:

    以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。这类存储的局限性是数据无法进行随机的读写。

    (2)动态数据:

    以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。

    从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。

    数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景。但是缺点也比较明显:

    (1)架构复杂。从架构上看,数据在HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。

    (2)时效性低。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。

    (3)难以应对后续的更新。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。

    为了解决上述架构的这些问题,KUDU应运而生。KUDU的定位是Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。

    二、kudu基础

    2.1 使用场景

    适用于那些既有随机访问,也有批量数据扫描的复合场景、高计算量的场景、使用了高性能的存储设备,包括使用更多的内存、支持数据更新,避免数据反复迁移、支持跨地域的实时数据备份和查询。

    2.2 kudu架构

     与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的Tablet Server(可对比理解HBase中的RegionServer角色)节点用来存储实际数据。可以部署多个Master节点来提高容错性。一个table表的数据,被分割成1个或多个Tablet,Tablet被部署在Tablet Server来提供数据读写服务。

    一些基本概念:

    Master:集群中的老大,负责集群管理、元数据管理等功能

    Tablet Server: 集群中的小弟,负责数据存储,并提供数据读写服务。一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。

    Table(表):一张table是数据存储在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主键)。table 被分成称为 tablets 的 segments。

    Tablet:一个 tablet 是一张 table连续的segment,tablet是kudu表的水平分区,类似于google Bigtable的tablet,或者HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。

    三、kudu分区

    为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。 kudu提供了3种分区方式。

    3.1  Range Partitioning ( 范围分区 )

    范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.

      1 /**
      2 
      3 * 测试分区:
      4 
      5 * RangePartition
      6 
      7 */
      8 
      9 @Test
     10 
     11 public void testRangePartition() throws KuduException {
     12 
     13 //设置表的schema
     14 
     15 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
     16 
     17 columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
     18 
     19 columnSchemas.add(newColumn("WorkId", Type.INT32,false));
     20 
     21 columnSchemas.add(newColumn("Name", Type.STRING,false));
     22 
     23 columnSchemas.add(newColumn("Gender", Type.STRING,false));
     24 
     25 columnSchemas.add(newColumn("Photo", Type.STRING,false));
     26 
     27    
     28 
     29 //创建schema
     30 
     31 Schema schema = new Schema(columnSchemas);
     32 
     33    
     34 
     35 //创建表时提供的所有选项
     36 
     37 CreateTableOptions tableOptions = new CreateTableOptions();
     38 
     39 //设置副本数
     40 
     41 tableOptions.setNumReplicas(1);
     42 
     43 //设置范围分区的规则
     44 
     45 LinkedList<String> parcols = new LinkedList<String>();
     46 
     47 parcols.add("CompanyId");
     48 
     49 //设置按照那个字段进行range分区
     50 
     51 tableOptions.setRangePartitionColumns(parcols);
     52 
     53    
     54 
     55 /**
     56 
     57 * range
     58 
     59 * 0 < value < 10
     60 
     61 * 10 <= value < 20
     62 
     63 * 20 <= value < 30
     64 
     65 * ........
     66 
     67 * 80 <= value < 90
     68 
     69 * */
     70 
     71 int count=0;
     72 
     73 for(int i =0;i<10;i++){
     74 
     75 //范围开始
     76 
     77 PartialRow lower = schema.newPartialRow();
     78 
     79 lower.addInt("CompanyId",count);
     80 
     81    
     82 
     83 //范围结束
     84 
     85 PartialRow upper = schema.newPartialRow();
     86 
     87 count +=10;
     88 
     89 upper.addInt("CompanyId",count);
     90 
     91    
     92 
     93 //设置每一个分区的范围
     94 
     95 tableOptions.addRangePartition(lower,upper);
     96 
     97 }
     98 
     99    
    100 
    101 try {
    102 
    103 kuduClient.createTable("student",schema,tableOptions);
    104 
    105 } catch (KuduException e) {
    106 
    107 e.printStackTrace();
    108 
    109 }
    110 
    111 kuduClient.close();
    112 
    113 }
    范围分区

    3.2 Hash Partitioning (哈希分区)

    哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。

     1 /**
     2 
     3 * 测试分区:
     4 
     5 * hash分区
     6 
     7 */
     8 
     9 @Test
    10 
    11 public void testHashPartition() throws KuduException {
    12 
    13 //设置表的schema
    14 
    15 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
    16 
    17 columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
    18 
    19 columnSchemas.add(newColumn("WorkId", Type.INT32,false));
    20 
    21 columnSchemas.add(newColumn("Name", Type.STRING,false));
    22 
    23 columnSchemas.add(newColumn("Gender", Type.STRING,false));
    24 
    25 columnSchemas.add(newColumn("Photo", Type.STRING,false));
    26 
    27    
    28 
    29 //创建schema
    30 
    31 Schema schema = new Schema(columnSchemas);
    32 
    33    
    34 
    35 //创建表时提供的所有选项
    36 
    37 CreateTableOptions tableOptions = new CreateTableOptions();
    38 
    39 //设置副本数
    40 
    41 tableOptions.setNumReplicas(1);
    42 
    43 //设置范围分区的规则
    44 
    45 LinkedList<String> parcols = new LinkedList<String>();
    46 
    47 parcols.add("CompanyId");
    48 
    49 //设置按照那个字段进行range分区
    50 
    51 tableOptions.addHashPartitions(parcols,6);
    52 
    53 try {
    54 
    55 kuduClient.createTable("dog",schema,tableOptions);
    56 
    57 } catch (KuduException e) {
    58 
    59 e.printStackTrace();
    60 
    61 }
    62 
    63 
    64 kuduClient.close();
    65 
    66 }
    哈希分区

    3.3 Multilevel Partitioning ( 多级分区 )

    Kudu 允许一个表在单个表上组合多级分区。 当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点需求.

     1 /**
     2 
     3 * 测试分区:
     4 
     5 * 多级分区
     6 
     7 * Multilevel Partition
     8 
     9 * 混合使用hash分区和range分区
    10 
    11 *
    12 
    13 * 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,
    14 
    15 * hash分区和range分区结合,可以极大的提升kudu的性能
    16 
    17 */
    18 
    19 @Test
    20 
    21 public void testMultilevelPartition() throws KuduException {
    22 
    23 //设置表的schema
    24 
    25 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
    26 
    27 columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
    28 
    29 columnSchemas.add(newColumn("WorkId", Type.INT32,false));
    30 
    31 columnSchemas.add(newColumn("Name", Type.STRING,false));
    32 
    33 columnSchemas.add(newColumn("Gender", Type.STRING,false));
    34 
    35 columnSchemas.add(newColumn("Photo", Type.STRING,false));
    36 
    37    
    38 
    39 //创建schema
    40 
    41 Schema schema = new Schema(columnSchemas);
    42 
    43 //创建表时提供的所有选项
    44 
    45 CreateTableOptions tableOptions = new CreateTableOptions();
    46 
    47 //设置副本数
    48 
    49 tableOptions.setNumReplicas(1);
    50 
    51 //设置范围分区的规则
    52 
    53 LinkedList<String> parcols = new LinkedList<String>();
    54 
    55 parcols.add("CompanyId");
    56 
    57    
    58 
    59 //hash分区
    60 tableOptions.addHashPartitions(parcols,5);
    61 
    62 
    63 
    64 //range分区
    65 
    66 int count=0;
    67 for(int i=0;i<10;i++){
    68 PartialRow lower = schema.newPartialRow();
    69 lower.addInt("CompanyId",count);
    70 count+=10;
    71 
    72 PartialRow upper = schema.newPartialRow();
    73 upper.addInt("CompanyId",count);
    74 tableOptions.addRangePartition(lower,upper);
    75 
    76 }
    77 
    78 try {
    79 
    80 kuduClient.createTable("cat",schema,tableOptions);
    81 
    82 } catch (KuduException e) {
    83 
    84 e.printStackTrace();
    85 
    86 }
    87 
    88 kuduClient.close();
    89 
    90 }
    混合分区

    四、kudu原理

    4.1 表与schema

    Kudu设计是面向结构化存储的,因此,Kudu的表需要用户在建表时定义它的Schema信息,这些Schema信息包含:列定义(含类型),Primary Key定义(用户指定的若干个列的有序组合)。数据的唯一性,依赖于用户所提供的Primary Key中的Column组合的值的唯一性。 Kudu提供了Alter命令来增删列,但位于Primary Key中的列是不允许删除的。 Kudu当前并不支持二级索引。 从用户角度来看,Kudu是一种存储结构化数据表的存储系统。在一个Kudu集群中可以定义任意数量的table,每个table都需要预先定义好schema。每个table的列数是确定的,每一列都需要有名字和类型,每个表中可以把其中一列或多列定义为主键。这么看来,Kudu更像关系型数据库,而不是像HBase、Cassandra和MongoDB这些NoSQL数据库。不过Kudu目前还不能像关系型数据一样支持二级索引。

    Kudu使用确定的列类型,而不是类似于NoSQL的"everything is byte"。这可以带来两点好处: 确定的列类型使Kudu可以进行类型特有的编码。 可以提供 SQL-like 元数据给其他上层查询工具,比如BI工具。

    4.2 kudu的底层数据模型

    kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于

    Table/Tablet/Replica视图级别的底层存储系统。

    这套实现基于如下的几个设计目标:

    • 可提供快速的列式查询
    • 可支持快速的随机更新
    • 可提供更为稳定的查询性能保障

    一张表会分成若干个tablet,每个tablet包括MetaData元信息及若干个RowSet,RowSet包含一个MemRowSet及若干个DiskRowSet,DiskRowSet中包含一个BloomFile、Ad_hoc Index、BaseData、DeltaMem及若干个RedoFile和UndoFile(UndoFile一般情况下只有一个)。

    • MemRowSet:用于新数据insert及已在MemRowSet中的数据的更新,一个MemRowSet写满后会将数据刷到磁盘形成若干个DiskRowSet。每次到达32M生成一个DiskRowSet。
    • DiskRowSet:用于老数据的变更(mutation),后台定期对DiskRowSet做compaction,以删除没用的数据及合并历史数据,减少查询过程中的IO开销。
    • BloomFile:根据一个DiskRowSet中的key生成一个bloom filter,用于快速模糊定位某个key是否在DiskRowSet中存在。
    • Ad_hocIndex:是主键的索引,用于定位key在DiskRowSet中的具体哪个偏移位置。
    • BaseData是MemRowSet flush下来的数据,按列存储,按主键有序。
    • UndoFile是基于BaseData之前时间的历史数据,通过在BaseData上apply UndoFile中的记录,可以获得历史数据。
    • RedoFile是基于BaseData之后时间的变更(mutation)记录,通过在BaseData上apply RedoFile中的记录,可获得较新的数据。
    • DeltaMem用于DiskRowSet中数据的变更mutation,先写到内存中,写满后flush到磁盘形成RedoFile。

     

    MemRowSets可以对比理解成HBase中的MemStore, 而DiskRowSets可理解成HBase中的HFile。MemRowSets中的数据按照行试图进行存储,数据结构为B-Tree。

    MemRowSets中的数据被Flush到磁盘之后,形成DiskRowSets。

    DisRowSets中的数据,按照32MB大小为单位,按序划分为一个个的DiskRowSet。 DiskRowSet中的数据按照Column进行组织,与Parquet类似。

    这是Kudu可支持一些分析性查询的基础。每一个Column的数据被存储在一个相邻的数据区域,而这个数据区域进一步被细分成一个个的小的Page单元,与HBase File中的Block类似,对每一个Column Page可采用一些Encoding算法,以及一些通用的Compression算法。 既然可对Column Page可采用Encoding以及Compression算法,那么,对单条记录的更改就会比较困难了。

    前面提到了Kudu可支持单条记录级别的更新/删除,是如何做到的?

    与HBase类似,也是通过增加一条新的记录来描述这次更新/删除操作的。DiskRowSet是不可修改了,那么 KUDU 要如何应对数据的更新呢?在KUDU中,把DiskRowSet分为了两部分:base data、delta stores。base data 负责存储基础数据,delta stores负责存储 base data 中的变更数据.

    如上图所示,数据从 MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 base data),每份 DiskRowSet 在内存中都会有一个对应的DeltaMemStore,负责记录此 DiskRowSet 后续的数据变更(更新、删除)。DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset 对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。

     4.3 kudu的写流程

    如上图,当 Client 请求写数据时,先根据主键从Master Server中获取要访问的目标 Tablets,然后到依次对应的Tablet获取数据。

    因为KUDU表存在主键约束,所以需要进行主键是否已经存在的判断,这里就涉及到之前说的索引结构对读写的优化了。一个Tablet中存在很多个RowSets,为了提升性能,我们要尽可能地减少要扫描的RowSets数量。

    首先,我们先通过每个 RowSet 中记录的主键的(最大最小)范围,过滤掉一批不存在目标主键的RowSets,然后在根据RowSet中的布隆过滤器,过滤掉确定不存在目标主键的 RowSets,最后再通过RowSets中的 B-树索引,精确定位目标主键是否存在。

    如果主键已经存在,则报错(主键重复),否则就进行写数据(写 MemRowSet)。

    4.4 kudu的读流程

    如上图,数据读取过程大致如下:先根据要扫描数据的主键范围,定位到目标的Tablets,然后读取Tablets 中的RowSets。

    在读取每个RowSet时,先根据主键过滤要scan范围,然后加载范围内的base data,再找到对应的delta stores,应用所有变更,最后union上MemRowSet中的内容,返回数据给Client。

    4.5 kudu的更新流程

    数据更新的核心是定位到待更新数据的位置,这块与写入的时候类似,就不展开了,等定位到具体位置后,然后将变更写到对应的delta store 中。

    转自:https://www.cnblogs.com/starzy/p/10573508.html

  • 相关阅读:
    转:Omnet++ 4.0 installation for Ubuntu
    转:myeclipse假死的解决方案
    omnet++ 4.0下使用XML的例子
    转:Microsoft JET Database Engine (0x80004005) 未指定的错误的完美解决
    C# 数据库删除操作错误报错 System.Data.SqlClient.SqlException (0x80131904)
    Windows 7 转移用户文件夹
    CentOS自动登录Gnome
    Archlinux GRUB2 配置
    Archlinux 登录管理器切换
    html2chm工具1.0发布
  • 原文地址:https://www.cnblogs.com/liuzhongfeng/p/11979259.html
Copyright © 2011-2022 走看看