zoukankan      html  css  js  c++  java
  • flink创建视图的几种方式

    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Json;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    
    /**
     * @Auther: Created By gaoxing
     * @Date: 2020/4/7 14:07
     * @Description: flink-1.10.0版本中几种创建table的方法,即创建数据源的方法
     */
    public class CreateView {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            /**
             * 第一种方法
             * stream to view
             */
            DataStreamSource<String> socketLines = env.socketTextStream("localhost", 8888);
            tableEnv.createTemporaryView("t_socket", socketLines, "row_data");
    
            /**
             * 第二种方法
             * 使用flink的connect连接器
             * 该方法没有任何的返回值,直接注册一张临时视图出来
             */
            tableEnv.connect(
                    new Kafka()
                            .version("universal")
                            .topic("sql-json-test")
                            .startFromEarliest()
                            .property("zookeeper.connect", "")
                            .property("bootstrap.servers", "")
                            .property("group.id", "flink")
            ).withFormat(
                    new Json()
                            .failOnMissingField(true)
                    // 如果不指定schema信息,则会自动推断信息,派生出来schema,这个行为默认是生效的,不再需要显式申明
    //                        .schema(new RowTypeInfo(
    //                                new BasicTypeInfo[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
    //                                new String[]{"id", "name"}
    //                        ))
            ).withSchema(
                    new Schema()
                            .field("id", DataTypes.STRING())
                            .field("name", DataTypes.STRING())
            ).inAppendMode()
                    .createTemporaryTable("kafka_test");
    
    
            /**
             * 第三种方法
             * 使用create table的sSQL语句进行创建
             */
    
            tableEnv.sqlUpdate(
                    "CREATE TABLE t_kafka(" +
                            " id int," +
                            " name string" +
                            " ) WITH (" +
                            " 'connector.type' = 'kafka'," +
                            " 'connector.version' = 'universal'," +
                            " 'connector.topic' = 'sql-json-test', " +
                            " 'connector.startup-mode' = 'earliest-offset'," +
                            " 'connector.properties.zookeeper.connect' = ''," +
                            " 'connector.properties.bootstrap.servers' = ''," +
                            " 'update-mode' = 'append'," +
                            " 'format.type' = 'json'," +
                            " 'format.derive-schema' = 'true'," +
                            " 'format.fail-on-missing-field' = 'true'"
            );
    
            env.execute("CreateTable");
    
    
        }
    }
    
  • 相关阅读:
    mysql 存储过程 异常处理机制
    Maven 私服打包
    Flink(2):Flink的Source源
    Flink(1):Flink的基础案例
    最后一课
    我的获奖记录及 Important Dates in OI
    目录
    入坑 OI 三周年之际的一些感想
    洛谷 P3781
    Atcoder Typical DP Contest S
  • 原文地址:https://www.cnblogs.com/night-xing/p/12654036.html
Copyright © 2011-2022 走看看