zoukankan      html  css  js  c++  java
  • Kafka

    1.概述

      上次给大家分享了关于 Kafka SQL 的实现思路,这次给大家分享如何实现 Kafka SQL。要实现 Kafka SQL,在上一篇《Kafka - SQL 引擎分享》中分享了其实现的思路,核心包含数据源的加载,以及 SQL 树的映射。今天笔者给大家分享相关实现的代码。

    2.内容

      这里,将数据映射成 SQL Tree 是使用了 Apache Calcite 来承接这部分工作。在实现代码之前,我们首先来了解下 Apache Calcite 的相关内容,Apache Calcite 是一个面向 Hadoop 的查询引擎,它提供了业界标准的 SQL 语言,以及多种查询优化和连接各种存储介质的适配器。另外,还能处理 OLAP 和流处理场景。因为存在这么多优秀和闪光的特性, Hadoop 生态圈中 Apache Calcite 越发引人注目,被诸多项目所集成,常见的有:

    • Apache Drill:基于大数据的实时查询引擎
    • Apache Spark:继 Hadoop 之后的新一代大数据分布式处理框架。
    • 更多详情,这里就不一一列举了,详情查看地址:《Adapters

    2.1 数据类型

      这里数据源的数据类型,我们分为两种,一种是 SQL,另一种是基于编程语言的,这里我们所使用的是 Java,定义内容如下:

    public static Map<String, SqlTypeName> SQLTYPE_MAPPING = new HashMap<String, SqlTypeName>();
    public static Map<String, Class> JAVATYPE_MAPPING = new HashMap<String, Class>();
    
    public static void initRowType() {
            SQLTYPE_MAPPING.put("char", SqlTypeName.CHAR);
            JAVATYPE_MAPPING.put("char", Character.class);
            SQLTYPE_MAPPING.put("varchar", SqlTypeName.VARCHAR);
            JAVATYPE_MAPPING.put("varchar", String.class);
            // ......     
    }

    2.2 表的相关描述

      另外,我们需要对表进行一个描述,在关系型数据库中,一个正常的表由行列组成,定义内容如下:

        public static class Database {
            public List<Table> tables = new LinkedList<Table>();
        }
    
        public static class Table {
            public String tableName;
            public List<Column> columns = new LinkedList<Column>();
            public List<List<String>> data = new LinkedList<List<String>>();
        }
    
        public static class Column {
            public String name;
            public String type;
        }

      在每个集合中存储数据库相关名称,每个数据库存储多个集合的表对象,每个表对象下面又有一系列的列以及绑定的数据源。在每个列对象中包含字段名和类型,层层递进,依次关联。在使用 Calcite 是,需要遵循其 JSON Model,上篇博客我们已经定义过其 JSON Model,这里我们直接拿来使用,内容如下:

    {
        version: '1.0',
        defaultSchema: 'kafka',  
        schemas: [  
            {
                name: 'kafka',  
                type: 'custom',
                factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
                operand: {
                    database: 'kafka_db'
                }  
            } 
        ]
    }

       要实现其 Model ,这里需要我们去实现 org.apache.calcite.schema.SchemaFactory 的接口,内容如下所示:

    public class KafkaMemorySchemaFactory implements SchemaFactory {
        @Override
        public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
            return new KafkaMemorySchema(name);
        }
    }

      而在 KafkaMemorySchema 类中,我们只需要实现它的 getTableMap 方法,内容如下所示:

     @Override
     protected Map<String, Table> getTableMap() {
       Map<String, Table> tables = new HashMap<String, Table>();
        Database database = KafkaMemoryData.MAP.get(this.dbName);
        if (database == null)
          return tables;
        for (KafkaMemoryData.Table table : database.tables) {
          tables.put(table.tableName, new KafkaMemoryTable(table));
        }
        return tables;
     }

      从上述代码中,可以知道通过内存中的 Map 表查看对应的数据库对象,然后根据数据库对象中的表作为 Schema 中的表,而表的类型为 KafkaMemoryTable。

    2.3 表类型

      这里笔者就直接使用全表扫描,使用 org.apache.calcite.schema.impl.AbstractTable 的默认方式,实现其 getRowType 方法和 scan 方法,内容如下所示:

    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
      if(dataType == null) {
         RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
          for (KafkaMemoryData.Column column : this.sourceTable.columns) {
            RelDataType sqlType = typeFactory.createJavaType(
            KafkaMemoryData.JAVATYPE_MAPPING.get(column.type));
            sqlType = SqlTypeUtil.addCharsetAndCollation(sqlType, typeFactory);
            fieldInfo.add(column.name, sqlType);
          }
          this.dataType = typeFactory.createStructType(fieldInfo);
       }
       return this.dataType;
    }
    public Enumerable<Object[]> scan(DataContext root) {
            final List<String> types = new ArrayList<String>(sourceTable.columns.size());
            for(KafkaMemoryData.Column column : sourceTable.columns) {
                types.add(column.type);
            }
            final int[] fields = identityList(this.dataType.getFieldCount());
            return new AbstractEnumerable<Object[]>() {
                public Enumerator<Object[]> enumerator() {
                    return new KafkaMemoryEnumerator<Object[]>(fields, types, sourceTable.data);
                }
            };
        }

      代码中,表中的字段名和类型是根据初始化时,每个表中的数据类型映射匹配的,在 KafkaMemoryData.SQLTYPE_MAPPING 和 KafkaMemoryData.JAVATYPE_MAPPING 中有描述相关自定义类型映射,这里就不多做赘述了。

      实现流程大致就是这个样子,将每次的 SQL 查询,通过 Calcite 解析成标准可执行的 SQL 计划,执行期间会根据定义的信息,初始化每一个 Schema,在通过调用 getTableMap 获取字段名和类型,根据这些信息判断查询的表,字段名,类型以及 SQL 语法是否标准规范。然后在使用 Calcite 内部机制,生成物理执行计划。查询计划是 Tree 形式的,底层是进行扫表操作(可看作为 FROM),获取每个表的数据,之后在根据表数据进行上层的关联操作,如 JOIN,GROUP BY,LIMIT 等操作。

    3.测试

      完成上述流程后,进行代码测试,测试代码如下所示:

    public static void main(String[] args) {
            try {
                Class.forName("org.apache.calcite.jdbc.Driver");
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            Properties info = new Properties();
            try {
                Connection connection = DriverManager.getConnection("jdbc:calcite:model=/Users/dengjie/hadoop/workspace/kafka/kafka-visual/src/main/resources/plugins.json",info);    
                Statement st = connection.createStatement();
                // String sql = "select * from "Kafka" where "_plat"='1004' limit 1";
                String sql = "select * from "Kafka" limit 10";
    
                long start = System.currentTimeMillis();
                result = st.executeQuery(sql);
                ResultSetMetaData rsmd = result.getMetaData();
                List<Map<String, Object>> ret = new ArrayList<Map<String,Object>>();
                
                while (result.next()) {
                    Map<String, Object> map = new HashMap<String, Object>();
                    for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                        System.out.print(result.getString(rsmd.getColumnName(i)) + " ");
                        map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
                    }
                    ret.add(map);
                    System.out.println();
                }
                System.out.println(new Gson().toJson(ret));       
                result.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    4.总结

      以上便是将 Kafka 中数据消费后,作为数据源加载和 SQL Tree 映射的实现代码,实现不算太困难,在编写 SQL 查询的时候,需要遵循标准的 SQL 语法来操作数据源。

    5.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  • 相关阅读:
    JVM系列文章(三):Class文件内容解析
    android开发 不注意的异常
    【数据结构】二叉树
    Android解析中国天气网的Json数据
    最简单也最难——怎样获取到Android控件的高度
    Android通过HTTP POST带參訪问asp.net网页
    js 推断 当页面无法回退时(history.go(-1)),关闭网页
    SQL Server数据库存储过程的异常处理
    SQL Server代码如何快速格式化,sqlserver代码
    sql server 获取指定格式的当前日期
  • 原文地址:https://www.cnblogs.com/smartloli/p/5470941.html
Copyright © 2011-2022 走看看