zoukankan      html  css  js  c++  java
  • Link Table API JAVA_STREAM_DEMO


    <dependencies>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.9.0</version>
    <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.9.0</version>
    <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.9.0</version>
    <!--<scope>provided</scope>-->
    </dependency>

    </dependencies>







    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;

    /**
    * FLink Java Stream Table API DEMO
    *
    * @author: create by maoxiangyi
    * @version: v1.0
    */
    public class WordCountSql_Stream {

    public static void main(String[] args) throws Exception {
    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

    DataStreamSource<WC> myDataSource = fsEnv.addSource(new SourceFunction<WC>() {
    @Override
    public void run(SourceContext<WC> sourceContext) throws Exception {
    while (true) {
    Thread.sleep(1000);
    sourceContext.collect(new WC("Hello", 1));
    }
    }

    @Override
    public void cancel() {
    // nothing
    }
    });

    fsTableEnv.registerDataStream("WordCount", myDataSource);

    // run a SQL query on the Table and retrieve the result as a new Table
    Table table = fsTableEnv.sqlQuery(
    "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");

    // 使用flinkSQL处理实时数据当我们把表转化成流的时候,需要使用toAppendStream与toRetractStream这两个方法。
    // 稍不注意可能直接选择了toAppendStream。
    //追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加,并且以前发出的结果永远不会更新。
    // 如果更新或删除操作使用追加模式会失败报错
    //缩进模式:始终可以使用此模式。返回值是boolean类型。
    // 它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回
    // 按照官网的理解如果数据只是不断添加,可以使用追加模式,
    // 其余方式则不可以使用追加模式,而缩进模式侧可以适用于更新,删除等场景
    // https://blog.csdn.net/aa518189/article/details/87816139
    DataStream<Tuple2<Boolean, WC>> stream = fsTableEnv.toRetractStream(table, WC.class);

    stream.print();

    fsTableEnv.execute("flink stream table job");
    }
    }




    10> (true,word='Hello', frequency=1)
    10> (false,word='Hello', frequency=1)
    10> (true,word='Hello', frequency=2)
    10> (false,word='Hello', frequency=2)
    10> (true,word='Hello', frequency=3)
    10> (false,word='Hello', frequency=3)
    10> (true,word='Hello', frequency=4)
    10> (false,word='Hello', frequency=4)
    10> (true,word='Hello', frequency=5)
    10> (false,word='Hello', frequency=5)
    10> (true,word='Hello', frequency=6)
    10> (false,word='Hello', frequency=6)
    10> (true,word='Hello', frequency=7)





  • 相关阅读:
    解决org.openqa.selenium.WebDriverException: Unable to connect to host 127.0.0.1 on port 7055 after 45000 ms org.springframework.beans.BeanInstantiation
    jsp学习---css基础知识学习,float,position,padding,div,margin
    jsp学习---mvc模式介绍和el表达式,jstl标签库的使用入门
    java 查询 mongodb 中的objectid
    jsp学习---使用jsp和JavaBean实现超简单网页计算器
    jsp学习--JavaBean定义和在Jsp中使用JavaBean
    jsp学习--如何定位错误和JSP和Servlet的比较
    jsp学习--JSP运行原理,九大隐式对象和JSP常用标签
    jsp学习--基本语法和基础知识
    android开发学习---layout布局、显示单位和如何进行单元测试
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/11869393.html
Copyright © 2011-2022 走看看