zoukankan      html  css  js  c++  java
  • Flink SQL 批处理


    <dependencies>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.0</version>
    </dependency>
    </dependencies>




    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;

    public class FLinkSqlBatch {
    public static void main(String[] args) throws Exception {
    //1) 获取执行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    /**
    * 1.csv
    * channel,subject,refer,reg,ord,pv,uv
    * 朋友圈,三科,H5,100,100,100,100
    * 朋友圈,数学,H5,100,100,100,100
    * 朋友圈,理科,H5,100,100,100,100
    * 朋友圈,编程,H5,100,100,100,100
    * 朋友圈,英语,H5,100,100,100,100
    * 朋友圈,通用,H5,100,100,100,100
    */
    // 2)读取数据
    DataSet<AdPojo> csvInput = env
    .readCsvFile("D:\code\learn\flink-sql\src\main\resources\1.csv")
    .ignoreFirstLine() //忽略第一行
    .pojoType(AdPojo.class, "channel", "subject", "refer", "reg", "ord", "pv", "uv");

    //3)将DataSet转换为Table,并注册为table1
    Table topScore = tableEnv.fromDataSet(csvInput);
    tableEnv.registerTable("table1", topScore);

    //4)自定义sql语句
    Table groupedByCountry = tableEnv.sqlQuery("select channel,subject,refer,reg,ord,pv,uv from table1");
    //5)转换回dataset
    DataSet<AdPojo> result = tableEnv.toDataSet(groupedByCountry, AdPojo.class);
    //6)打印
    result.print();
    }
    }




    AdPojo{channel='朋友圈', subject='英语', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='通用', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='三科', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='理科', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='数学', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='编程', refer='H5', reg='100', ord='100', pv='100', uv='100'}




  • 相关阅读:
    C#磁吸屏幕窗体类库
    准备
    我写的诗
    How to turn off a laptop keyboard
    How to tell which commit a tag points to in Git?
    Why should I care about lightweight vs. annotated tags?
    How to get rid of “would clobber existing tag”
    Facebook, Google and Twitter threaten to leave Hong Kong over privacy law changes
    The need for legislative reform on secrecy orders
    Can a foreign key be NULL and/or duplicate?
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/11586461.html
Copyright © 2011-2022 走看看