zoukankan      html  css  js  c++  java
  • Flink系统之Table API 和 SQL

      Flink提供了像表一样处理的API和像执行SQL语句一样把结果集进行执行。这样很方便的让大家进行数据处理了。比如执行一些查询,在无界数据和批处理的任务上,然后将这些按一定的格式进行输出,很方便的让大家像执行SQL一样简单。

      今天主要写的东西分为如下几个方面,然后遵循着下边几个方面进行展开:

      1. Flink的不同API的层级梗概。

      2. FlinkSQL的编程的步骤。

      3. Flink编程的例子。

      

    一、  Flink有着不同级别的API,不同级别的API方便不同用户进行处理。普通用户使用Datastream以及Dataset进行程序编写,我们可以在其更高的基础上使用Table API以及SQL,这也是Flink的强大之处,可以像使用处理表一样处理数据。如果想研究的更高可以看更底层的东西。

    SQL  High-level Language
    Table API Declarative  DSL
    Datastream / Dataset API Core API
    Stateful Stream Processing

    Low-level building block

    (streams, state, [event] time)

    二、 Flink的Table API 和 SQL编程步骤如下:

      1) 创建一个TableEnvironment表环境用于后续使用。TableEnvironment是 SQL 和 Table API的核心概念,它用于设置执行所需要的数据属性,和ExecutionEnvironment类似,它主要负责:

        a) 注册表数据源,从内部或者外部来源。

        b) 执行相应的SQL语句。

        c) 注册自定义集数。

        d 将结果集进行扫描和写入到目标数据源。

        e) 相同的environment可以执行相应的join unin操作。

      2)接下来,咱们看一下如何注册数据源,注意不同的Flink版本有不同的实现,但是核心的内容是不变的:

        a) 可以直接从数据集里进行注册。比如 tableEnvironment.registerDataSet()。

        b) 在一个已经存在的Table中直接执行scan或者select,那么会生成一个新的Table,也就是数据可以从已有的Table中再次获取,Table t = tableEnv.scan("x").select("a, b,c")。

        c) 可以是TableSource, 也就是从不同的文件、数据库、消息系统进行读取。 比如csv文件,TableSource csvSource = new CsvTableSource("path/to/file")。

      3)读取完数据后进行处理,处理完之后要存储起来,那么需要Sink(存储)到文件或者数据库、消息系统等。

        a) 比如Sink到CSV文件。 TableSink csvSink = new TableCSVSink("path/to/sink", ..)。

        b) Sink为指定字段句和类型到CSV文件中。

          指定表字段: String[] fieldNames = {"fild1", "filed2", "field3"}; 

          指定字段类型: TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 

          指定表名和csv文件:tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

    三、接下来,看一下真实的例子。

        1)从给定的单词和单词的个数中统计一下,每个单词出现的数据,使用SQL语句进行实现查询统计。完整的样例如下(注意,不同的FLink版本实现上有稍微的差异):

    package myflink.sql;
    
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    
    public class WordCountSQL {
    
        public static void main(String[] args) throws Exception {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);
    
            DataSet<WC> input = env.fromElements(
                    WC.of("hello", 1),
                    WC.of("hqs", 1),
                    WC.of("world", 1),
                    WC.of("hello", 1)
            );
            //注册数据集
            tEnv.registerDataSet("WordCount", input, "word, frequency");
    
            //执行SQL,并结果集做为一个新表
            Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
    
            DataSet<WC> result = tEnv.toDataSet(table, WC.class);
    
            result.print();
    
        }
    
        public static class WC {
            public String word; //hello
            public long frequency;
    
            //创建构造方法,让flink进行实例化
            public WC() {}
    
            public static WC of(String word, long frequency) {
                WC wc = new WC();
                wc.word = word;
                wc.frequency = frequency;
                return wc;
            }
    
            @Override
            public String toString() {
                return "WC " + word + " " + frequency;
            }
        }
    
    }

     

      输出的结果为,和我们想的结果是一样的。

    WC world 1
    WC hello 2
    WC hqs 1

      2)接下来的例子会复杂一些,从一个txt文件中读取数据,txt文件中包含id, 人字, 书名,价格信息。然后将数据注册成一个表,然后将这个表的结果进行统计,按人名统计出来这个人买书所花费的钱,将结果sink到一个文件中。上代码。

      

    package myflink.sql;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Types;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    import org.apache.flink.table.sinks.CsvTableSink;
    import org.apache.flink.table.sinks.TableSink;
    
    public class SQLFromFile {
    
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    
            env.setParallelism(1);
            //读取文件
            DataSource<String> input = env.readTextFile("test.txt");
            //将读取到的文件进行输出
            input.print();
            //转换为DataSet
            DataSet<Orders> inputDataSet = input.map(new MapFunction<String, Orders>() {
                @Override
                public Orders map(String s) throws Exception {
                    String[] splits = s.split(" ");
                    return Orders.of(Integer.valueOf(splits[0]), String.valueOf(splits[1]), String.valueOf(splits[2]), Double.valueOf(splits[3]));
                }
            });
            //转换为table
            Table order = tableEnv.fromDataSet(inputDataSet);
            //注册Orders表名
            tableEnv.registerTable("Orders", order);
            Table nameResult = tableEnv.scan("Orders").select("name");
            //输出一下表
            nameResult.printSchema();
    
            //执行一下查询
            Table sqlQueryResult = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");
            //查询结果转换为DataSet
            DataSet<Result> result = tableEnv.toDataSet(sqlQueryResult, Result.class);
            result.print();
    
            //以tuple的方式进行输出
            result.map(new MapFunction<Result, Tuple2<String, Double>>() {
                @Override
                public Tuple2<String, Double> map(Result result) throws Exception {
                    String name = result.name;
                    Double total = result.total;
                    return Tuple2.of(name, total);
                }
            }).print();
    
            TableSink sink  = new CsvTableSink("SQLText.txt", " | ");
    
            //设置字段名
            String[] filedNames = {"name", "total"};
            //设置字段类型
            TypeInformation[] filedTypes = {Types.STRING(), Types.DOUBLE()};
    
            tableEnv.registerTableSink("SQLTEXT", filedNames, filedTypes, sink);
    
            sqlQueryResult.insertInto("SQLTEXT");
    
            env.execute();
    
        }
    
        public static class Orders {
            public Integer id;
            public String name;
            public String book;
            public Double price;
    
            public Orders() {
                super();
            }
    
            public static Orders of(Integer id, String name, String book, Double price) {
                Orders orders = new Orders();
                orders.id = id;
                orders.name = name;
                orders.book = book;
                orders.price = price;
                return orders;
            }
        }
    
        public static class Result {
            public String name;
            public Double total;
    
            public Result() {
                super();
            }
    
            public static Result of(String name, Double total) {
                Result result = new Result();
                result.name = name;
                result.total = total;
                return result;
            }
        }
    
    }

      

      想参考完整的代码,可以访问 https://github.com/stonehqs/flink-demo 。

     

      有问题,欢迎拍砖。

        

  • 相关阅读:
    CVE-2020-5405 Spring Cloud Config 目录穿越漏洞分析
    CVE-2019-3799spring-cloud-config 目录穿越漏洞复现
    fastjson<=1.2.68的漏洞分析
    【转载】半自动化挖掘request实现多种中间件回显
    整理一下weblogic回显的代码
    CentOS7在python交互模式下输入退回键时出现乱码^H^H
    1 单例设计模式
    用nohup命令实现PHP的多进程
    用进程和线程关系引入操作系统学习
    8.5 归并排序
  • 原文地址:https://www.cnblogs.com/huangqingshi/p/12334315.html
Copyright © 2011-2022 走看看