zoukankan      html  css  js  c++  java
  • Kudu存储实战笔记

      有人会问,为啥要用这个叫啥Kudu的,Kudu是啥?

      就像官网所说,Kudu是一个针对Apache hadoop 平台而开发的列式存储管理器,在本菜鸟看来,它是一种介于hdfs与hbase的一种存储。它的优势在于:

      1、OLAP工作的快速处理,也就是针对于查询,很快,很牛逼。

      2、针对同时运行顺序和随机工作负载的情况性能很好。

      3、高可用,Table server和master使用Raft Consensus Algorithm节点来保证高可用,什么是Raft Consunsus Algorith?参考:https://www.cnblogs.com/mindwind/p/5231986.html),只要有一半以上的副本可用,该tablet便可用于读写。

      4、结构化数据模型(可以理解为带schema)。

       该图显示了一个具有三个 master 和多个 tablet server 的 Kudu 集群,每个服务器都支持多个 tablet。它说明了如何使用 Raft 共识来允许 master 和 tablet server 的 leader 和 f ollow。此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色显示,而 follower 则显示为蓝色。

      下面是一些基本概念:

      Table(表)

      一张 talbe 是数据存储在 Kudu 的位置。表具有 schema 和全局有序的 primary key(主键)。table 被分成称为 tablets 的 segments。

      Tablet

      一个 tablet 是一张 table 连续的 segment,与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的 tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是 leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。

      Tablet Server

      一个 tablet server 存储 tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个 tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求。leader 使用 Raft Consunsus Algorithm来进行选举 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。

      具体我还没有那么深入,写了些api调用玩了一把,下面慢慢讲述,Kudu的API比较恶心的哈。。

      kudu的sql语法与传统的sql语法比较相似,但也不尽相同,直接解析时,具体sql语法请参考官网,下面以类似hive metastore表结构的方式封装了下。以下列sql为例:

        create table combined_t6 (x int64, s string, s2 string, primary key (x, s))
        partition by hash (x) partitions 10, range (x)
        (
          partition 0 <= values <= 49, partition 50 <= values <= 100
        ) REPLICAS 1

        public Boolean create(Table table,String operator) {
            LOGGER.info("kudu Table properties:" + table.getKvInfos().toString());
            List<ColumnSchema> columns = new ArrayList(table.getTableColumnList().size());
           KuduTableGenerateUtil.generateKuduColumn(table.getTableColumnList(),columns); Schema schema
    = new Schema(columns); KuduPartitionSchema kuduPartitionSchema = KuduTableGenerateUtil.parserPartition(table); CreateTableOptions tableOptions = KuduTableGenerateUtil.generateKuduTableOptions(table,schema,kuduPartitionSchema); try { getKuduClient(table).createTable(table.getTableName(), schema,tableOptions); } catch (KuduException e) { throw new MetadataInvalidObjectException(e, " create kudu storage table error!!"); } return true; }

      kudu的column属性中,包含有primarfyKey、encoding、compression algorithm、null table 、default value 、block size等属性,所以从上述代码中需要先将kuduColumn进行封装,构造ColumnSchema对象:

         new ColumnSchema.ColumnSchemaBuilder(tableColumn.getColumnName(), getKuduColumnType(tableColumn.getDataType()))
                 .key(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_PRIMARY_KEY)))
                 .nullable(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_IS_NULLTABLE)))
                 .defaultValue(defaultValue)
                 .desiredBlockSize(getDesiredBlockSize(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_DESIRED_BLOCKSIZE)))
                 .encoding(getColumnEncoding(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_ENCODING)))
                 .compressionAlgorithm(getCompressionType(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_COMPRESSION_ALGORITHM)))
                 .build();

      对于column的数据类型,有很多种,如下:

        private static Type getKuduColumnType(String dataType) {
            switch (dataType.toUpperCase()) {
                case "INT8":
                    return Type.INT8;
                case "INT16":
                    return Type.INT16;
                case "INT32":
                    return Type.INT32;
                case "INT64":
                    return Type.INT64;
                case "BINARY":
                    return Type.BINARY;
                case "STRING":
                    return Type.STRING;
                case "BOOL":
                    return Type.BOOL;
                case "FLOAT":
                    return Type.FLOAT;
                case "DOUBLE":
                    return Type.DOUBLE;
                case "UNIXTIME_MICROS":
                    return Type.UNIXTIME_MICROS;
                default:
                    return Type.STRING;
            }
        }

      压缩方式包括:

        public static CompressionAlgorithm getCompressionType(String compressionType) {
            if (StringUtils.isNotBlank(compressionType)) {
                switch (compressionType.toUpperCase()) {
                    case "UNKNOWN":
                        return CompressionAlgorithm.UNKNOWN;
                    case "DEFAULT_COMPRESSION":
                        return CompressionAlgorithm.DEFAULT_COMPRESSION;
                    case "NO_COMPRESSION":
                        return CompressionAlgorithm.NO_COMPRESSION;
                    case "SNAPPY":
                        return CompressionAlgorithm.SNAPPY;
                    case "LZ4":
                        return CompressionAlgorithm.LZ4;
                    case "ZLIB":
                        return CompressionAlgorithm.UNKNOWN.ZLIB;
                    default:
                        return null;
                }
            }
            return null;
        }

      随之我们要构造,Kudu Partition,Kudu Partition包含两种类型,一种是hashPartition,一种是rangePartition,其实从字面意思应该也能够想到,一种是用于对某个字段进行hash散列,一种是进行分区区间的设置,从而在查询时达到优化的效果,这里通过将sql解析后的转换的KuduPartitionSchema对象分别进行range与hash partition的组装,也就是将sql中 Partition表达式 partition 0 <= values <= 49, partition 50 <= values <= 100 封装:

        public static void generateHashPartition(CreateTableOptions tableOptions, List<HashPartitionSchema> hashPartitionSchemas) {
            if (null != hashPartitionSchemas && hashPartitionSchemas.size() != 0) {

            hashPartitionSchemas.forEach(hashPartitionSchema ->{
                tableOptions.addHashPartitions(hashPartitionSchema.getColumns(), hashPartitionSchema.getBucket());
            });     

          }

       }

    public static void generateRangePartition(Schema schema, CreateTableOptions tableOptions, RangePartitionSchema rangePartitionSchema) {
            tableOptions.setRangePartitionColumns(rangePartitionSchema.getColumns());
            List<RangeSplit> ranges = rangePartitionSchema.getRanges();
            ranges.forEach(range -> {
                tableOptions.addRangePartition(
                        getPartialRow(
                                range.getLower(),
                                schema,
                                rangePartitionSchema.getColumns()),
                        getPartialRow(
                                range.getUpper(),
                                schema,
                                rangePartitionSchema.getColumns()),
                        getRangePartitionBound(
                                range.getLowerBoundType()),
                        getRangePartitionBound(
                                range.getUpperBoundType())
                );
           });
        }
        public static RangePartitionBound getRangePartitionBound(String boundType) {
            if (StringUtils.isNotBlank(boundType)) {
                switch (boundType) {
                    case "EXCLUSIVE_BOUND":
                        return RangePartitionBound.EXCLUSIVE_BOUND;
                    case "INCLUSIVE_BOUND":
                        return RangePartitionBound.INCLUSIVE_BOUND;
                    default:
                        return null;
                }
            }
            return null;
        }

      最后构造,CreateTableOptions对象:

        public static CreateTableOptions generateKuduTableOptions(Table table, Schema schema, KuduPartitionSchema kuduPartitionSchema) {
            CreateTableOptions tableOptions = new CreateTableOptions();
            String numReplicas = table.getKvInfos().get(MetadataConfigKey.TABLE_KUDU_REPLICAS);
            if (StringUtils.isNotBlank(numReplicas)) {
                tableOptions.setNumReplicas(Integer.valueOf(numReplicas));
            }
            if (kuduPartitionSchema.getHashPartitionSchemaList() != null && kuduPartitionSchema.getHashPartitionSchemaList().size() != 0) {
                generateHashPartition(tableOptions, kuduPartitionSchema.getHashPartitionSchemaList());
            }
            if (kuduPartitionSchema.getRangePartitionSchema() != null) {
                generateRangePartition(schema, tableOptions, kuduPartitionSchema.getRangePartitionSchema());
            }
            return tableOptions;
        }

      没有hbase编程便捷。。不过对于kudu的连接而言,只需要配置kudu master的地址,便可创建连接。

        public KuduClient getKuduClient(Table table){
            if(null == kuduClient){
                try{
                    String kuduMaster = table.getStorageClusterKvs().get(MetadataConfigKey.CLUSTER_KUDU_MASTER);
                    kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build();
                }catch(Exception e){
                    throw new MetadataRuntimeException(e, " create kuduClient error!!");
                }
            }
            return kuduClient;
        }

       活儿干不完啊~改天再深入完 哈哈~

  • 相关阅读:
    探秘小程序(9):父页面与自定义组件交互
    探秘小程序(8):scroll-view组件
    探秘小程序(7):view组件
    探秘小程序(6):微信支付
    探秘小程序(5):用户信息
    通向全栈之路(6)—无密码ssh连接
    探秘小程序(4):授权
    探秘小程序(3):登录
    探秘小程序(2):自定义组件
    [Python]json对象转换出错expected string or buffer python
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/8059602.html
Copyright © 2011-2022 走看看