zoukankan      html  css  js  c++  java
  • Kafka

    1.概述

      上次给大家分享了关于 Kafka SQL 的实现思路,这次给大家分享如何实现 Kafka SQL。要实现 Kafka SQL,在上一篇《Kafka - SQL 引擎分享》中分享了其实现的思路,核心包含数据源的加载,以及 SQL 树的映射。今天笔者给大家分享相关实现的代码。

    2.内容

      这里,将数据映射成 SQL Tree 是使用了 Apache Calcite 来承接这部分工作。在实现代码之前,我们首先来了解下 Apache Calcite 的相关内容,Apache Calcite 是一个面向 Hadoop 的查询引擎,它提供了业界标准的 SQL 语言,以及多种查询优化和连接各种存储介质的适配器。另外,还能处理 OLAP 和流处理场景。因为存在这么多优秀和闪光的特性, Hadoop 生态圈中 Apache Calcite 越发引人注目,被诸多项目所集成,常见的有:

    • Apache Drill:基于大数据的实时查询引擎
    • Apache Spark:继 Hadoop 之后的新一代大数据分布式处理框架。
    • 更多详情,这里就不一一列举了,详情查看地址:《Adapters

    2.1 数据类型

      这里数据源的数据类型,我们分为两种,一种是 SQL,另一种是基于编程语言的,这里我们所使用的是 Java,定义内容如下:

    public static Map<String, SqlTypeName> SQLTYPE_MAPPING = new HashMap<String, SqlTypeName>();
    public static Map<String, Class> JAVATYPE_MAPPING = new HashMap<String, Class>();
    
    public static void initRowType() {
            SQLTYPE_MAPPING.put("char", SqlTypeName.CHAR);
            JAVATYPE_MAPPING.put("char", Character.class);
            SQLTYPE_MAPPING.put("varchar", SqlTypeName.VARCHAR);
            JAVATYPE_MAPPING.put("varchar", String.class);
            // ......     
    }

    2.2 表的相关描述

      另外,我们需要对表进行一个描述,在关系型数据库中,一个正常的表由行列组成,定义内容如下:

        public static class Database {
            public List<Table> tables = new LinkedList<Table>();
        }
    
        public static class Table {
            public String tableName;
            public List<Column> columns = new LinkedList<Column>();
            public List<List<String>> data = new LinkedList<List<String>>();
        }
    
        public static class Column {
            public String name;
            public String type;
        }

      在每个集合中存储数据库相关名称,每个数据库存储多个集合的表对象,每个表对象下面又有一系列的列以及绑定的数据源。在每个列对象中包含字段名和类型,层层递进,依次关联。在使用 Calcite 是,需要遵循其 JSON Model,上篇博客我们已经定义过其 JSON Model,这里我们直接拿来使用,内容如下:

    {
        version: '1.0',
        defaultSchema: 'kafka',  
        schemas: [  
            {
                name: 'kafka',  
                type: 'custom',
                factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
                operand: {
                    database: 'kafka_db'
                }  
            } 
        ]
    }

       要实现其 Model ,这里需要我们去实现 org.apache.calcite.schema.SchemaFactory 的接口,内容如下所示:

    public class KafkaMemorySchemaFactory implements SchemaFactory {
        @Override
        public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
            return new KafkaMemorySchema(name);
        }
    }

      而在 KafkaMemorySchema 类中,我们只需要实现它的 getTableMap 方法,内容如下所示:

     @Override
     protected Map<String, Table> getTableMap() {
       Map<String, Table> tables = new HashMap<String, Table>();
        Database database = KafkaMemoryData.MAP.get(this.dbName);
        if (database == null)
          return tables;
        for (KafkaMemoryData.Table table : database.tables) {
          tables.put(table.tableName, new KafkaMemoryTable(table));
        }
        return tables;
     }

      从上述代码中,可以知道通过内存中的 Map 表查看对应的数据库对象,然后根据数据库对象中的表作为 Schema 中的表,而表的类型为 KafkaMemoryTable。

    2.3 表类型

      这里笔者就直接使用全表扫描,使用 org.apache.calcite.schema.impl.AbstractTable 的默认方式,实现其 getRowType 方法和 scan 方法,内容如下所示:

    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
      if(dataType == null) {
         RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
          for (KafkaMemoryData.Column column : this.sourceTable.columns) {
            RelDataType sqlType = typeFactory.createJavaType(
            KafkaMemoryData.JAVATYPE_MAPPING.get(column.type));
            sqlType = SqlTypeUtil.addCharsetAndCollation(sqlType, typeFactory);
            fieldInfo.add(column.name, sqlType);
          }
          this.dataType = typeFactory.createStructType(fieldInfo);
       }
       return this.dataType;
    }
    public Enumerable<Object[]> scan(DataContext root) {
            final List<String> types = new ArrayList<String>(sourceTable.columns.size());
            for(KafkaMemoryData.Column column : sourceTable.columns) {
                types.add(column.type);
            }
            final int[] fields = identityList(this.dataType.getFieldCount());
            return new AbstractEnumerable<Object[]>() {
                public Enumerator<Object[]> enumerator() {
                    return new KafkaMemoryEnumerator<Object[]>(fields, types, sourceTable.data);
                }
            };
        }

      代码中,表中的字段名和类型是根据初始化时,每个表中的数据类型映射匹配的,在 KafkaMemoryData.SQLTYPE_MAPPING 和 KafkaMemoryData.JAVATYPE_MAPPING 中有描述相关自定义类型映射,这里就不多做赘述了。

      实现流程大致就是这个样子,将每次的 SQL 查询,通过 Calcite 解析成标准可执行的 SQL 计划,执行期间会根据定义的信息,初始化每一个 Schema,在通过调用 getTableMap 获取字段名和类型,根据这些信息判断查询的表,字段名,类型以及 SQL 语法是否标准规范。然后在使用 Calcite 内部机制,生成物理执行计划。查询计划是 Tree 形式的,底层是进行扫表操作(可看作为 FROM),获取每个表的数据,之后在根据表数据进行上层的关联操作,如 JOIN,GROUP BY,LIMIT 等操作。

    3.测试

      完成上述流程后,进行代码测试,测试代码如下所示:

    public static void main(String[] args) {
            try {
                Class.forName("org.apache.calcite.jdbc.Driver");
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            Properties info = new Properties();
            try {
                Connection connection = DriverManager.getConnection("jdbc:calcite:model=/Users/dengjie/hadoop/workspace/kafka/kafka-visual/src/main/resources/plugins.json",info);    
                Statement st = connection.createStatement();
                // String sql = "select * from "Kafka" where "_plat"='1004' limit 1";
                String sql = "select * from "Kafka" limit 10";
    
                long start = System.currentTimeMillis();
                result = st.executeQuery(sql);
                ResultSetMetaData rsmd = result.getMetaData();
                List<Map<String, Object>> ret = new ArrayList<Map<String,Object>>();
                
                while (result.next()) {
                    Map<String, Object> map = new HashMap<String, Object>();
                    for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                        System.out.print(result.getString(rsmd.getColumnName(i)) + " ");
                        map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
                    }
                    ret.add(map);
                    System.out.println();
                }
                System.out.println(new Gson().toJson(ret));       
                result.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    4.总结

      以上便是将 Kafka 中数据消费后,作为数据源加载和 SQL Tree 映射的实现代码,实现不算太困难,在编写 SQL 查询的时候,需要遵循标准的 SQL 语法来操作数据源。

    5.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  • 相关阅读:
    Blank page instead of the SharePoint Central Administration site
    BizTalk 2010 BAM Configure
    Use ODBA with Visio 2007
    Handling SOAP Exceptions in BizTalk Orchestrations
    BizTalk与WebMethods之间的EDI交换
    Append messages in BizTalk
    FTP protocol commands
    Using Dynamic Maps in BizTalk(From CodeProject)
    Synchronous To Asynchronous Flows Without An Orchestration的简单实现
    WSE3 and "Action for ultimate recipient is required but not present in the message."
  • 原文地址:https://www.cnblogs.com/smartloli/p/5470941.html
Copyright © 2011-2022 走看看