zoukankan      html  css  js  c++  java
  • Flink SQL 批处理


    <dependencies>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.0</version>
    </dependency>
    </dependencies>




    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;

    public class FLinkSqlBatch {
    public static void main(String[] args) throws Exception {
    //1) 获取执行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    /**
    * 1.csv
    * channel,subject,refer,reg,ord,pv,uv
    * 朋友圈,三科,H5,100,100,100,100
    * 朋友圈,数学,H5,100,100,100,100
    * 朋友圈,理科,H5,100,100,100,100
    * 朋友圈,编程,H5,100,100,100,100
    * 朋友圈,英语,H5,100,100,100,100
    * 朋友圈,通用,H5,100,100,100,100
    */
    // 2)读取数据
    DataSet<AdPojo> csvInput = env
    .readCsvFile("D:\code\learn\flink-sql\src\main\resources\1.csv")
    .ignoreFirstLine() //忽略第一行
    .pojoType(AdPojo.class, "channel", "subject", "refer", "reg", "ord", "pv", "uv");

    //3)将DataSet转换为Table,并注册为table1
    Table topScore = tableEnv.fromDataSet(csvInput);
    tableEnv.registerTable("table1", topScore);

    //4)自定义sql语句
    Table groupedByCountry = tableEnv.sqlQuery("select channel,subject,refer,reg,ord,pv,uv from table1");
    //5)转换回dataset
    DataSet<AdPojo> result = tableEnv.toDataSet(groupedByCountry, AdPojo.class);
    //6)打印
    result.print();
    }
    }




    AdPojo{channel='朋友圈', subject='英语', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='通用', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='三科', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='理科', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='数学', refer='H5', reg='100', ord='100', pv='100', uv='100'}
    AdPojo{channel='朋友圈', subject='编程', refer='H5', reg='100', ord='100', pv='100', uv='100'}




  • 相关阅读:
    Centos7下zabbix部署(三)自定义监控项
    Centos下zabbix部署(二)agent安装并设置监控
    Centos7下yum安装zabbix-server的部署(一)
    java 动态代理
    java 签名类 Signature
    java 证书 .cer 和 .pfx
    Mustache(3)
    Mustache(2)
    Mustache 使用心得总结
    微信小程序开发
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/11586461.html
Copyright © 2011-2022 走看看