zoukankan      html  css  js  c++  java
  • flink datastream 2 table

    https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/

    import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a DataStream DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John"); // interpret the insert-only DataStream as a Table Table inputTable = tableEnv.fromDataStream(dataStream); // register the Table object as a view and query it tableEnv.createTemporaryView("InputTable", inputTable); Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable"); // interpret the insert-only Table as a DataStream again DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); // add a printing sink and execute in DataStream API resultStream.print(); env.execute(); // prints: // +I[Alice] // +I[Bob] // +I[John]




    =================================================
    flink map

    DataStream<String> input = ...;
    
    DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
        @Override
        public Integer map(String value) {
            return Integer.parseInt(value);
        }
    });
  • 相关阅读:
    SVN资料库转移-----dump和load
    windows Server 2003修改远程连接限制
    oracle定时任务
    Oacle常用语句
    决策树
    Logistic回归
    Matplotlab绘图基础
    基本术语
    看懂执行并优化
    数据分析方法论
  • 原文地址:https://www.cnblogs.com/yjybupt/p/15693295.html
Copyright © 2011-2022 走看看