zoukankan      html  css  js  c++  java
  • flink-demo2

    package cn.irisz.steam
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    object Demo2 {
      def main(args: Array[String]): Unit = {
        // 1. env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.BATCH)
        env.setParallelism(1)
        val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val tEnv = StreamTableEnvironment.create(env, settings)
    
        // 2. source
        // val fileSource: DataStream[String] = env.readTextFile("data/aceess.log_20200914.csv")
        tEnv.executeSql(
          """
            |CREATE TABLE log (
            |   `id` Int,
            |   `i_city` String,
            |   `i_country` String,
            |   `i_isp` String,
            |   `i_province` String,
            |   `ip` String,
            |   `length` BigInt,
            |   `method` String,
            |   `referer` String,
            |   `status_code` Int,
            |   `t_hour` Int,
            |   `t_minute` Int,
            |   `t` TIMESTAMP,
            |   `ua` String,
            |   `url` String,
            |   `url_param` String,
            |   `url_path` String,
            |   `version` String,
            |   `xff` String
            |)WITH (
            |   'connector' = 'filesystem',
            |   'path' = 'data/aceess.log_20200914.csv',
            |   'format' = 'csv'
            |)
            |""".stripMargin)
    
        tEnv.executeSql(
          """
            |CREATE TABLE `result` (
            |   `t_hour` Int,
            |   `t_minute` Int,
            |   `cnt` BigInt
            |) WITH (
            |   'connector' = 'print'
            |)
            |""".stripMargin)
    
        // 3. transfer
    
        // 4. sink
    //    logStream.print()
    val result: TableResult = tEnv.sqlQuery(
      """
        |   SELECT t_hour, t_minute, COUNT(1) AS cnt
        |   FROM log
        |   WHERE status_code = 200
        |   GROUP BY t_hour, t_minute
        |""".stripMargin).execute()
    
        result.print()
    
        // 5. execute
        env.execute("calc log count for minute and hour").wait()
    //    tEnv.execute("calc log count for minute and hour")
      }
    }
    
    
    
  • 相关阅读:
    C++ 指针 new delete int*与string
    61.Android适配的那些P事(转)
    60.Android通用流行框架大全
    Android Studio配置指南总结
    大数据学习资源(下)
    大数据学习资源(上)
    59.Android开源项目及库 (转)
    Linux 简介
    7款应用最广泛的Linux桌面环境盘点
    58. Android一些开发习惯总结
  • 原文地址:https://www.cnblogs.com/zpzhue/p/14948086.html
Copyright © 2011-2022 走看看