zoukankan      html  css  js  c++  java
  • flink-demo2

    package cn.irisz.steam
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    object Demo2 {
      def main(args: Array[String]): Unit = {
        // 1. env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.BATCH)
        env.setParallelism(1)
        val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val tEnv = StreamTableEnvironment.create(env, settings)
    
        // 2. source
        // val fileSource: DataStream[String] = env.readTextFile("data/aceess.log_20200914.csv")
        tEnv.executeSql(
          """
            |CREATE TABLE log (
            |   `id` Int,
            |   `i_city` String,
            |   `i_country` String,
            |   `i_isp` String,
            |   `i_province` String,
            |   `ip` String,
            |   `length` BigInt,
            |   `method` String,
            |   `referer` String,
            |   `status_code` Int,
            |   `t_hour` Int,
            |   `t_minute` Int,
            |   `t` TIMESTAMP,
            |   `ua` String,
            |   `url` String,
            |   `url_param` String,
            |   `url_path` String,
            |   `version` String,
            |   `xff` String
            |)WITH (
            |   'connector' = 'filesystem',
            |   'path' = 'data/aceess.log_20200914.csv',
            |   'format' = 'csv'
            |)
            |""".stripMargin)
    
        tEnv.executeSql(
          """
            |CREATE TABLE `result` (
            |   `t_hour` Int,
            |   `t_minute` Int,
            |   `cnt` BigInt
            |) WITH (
            |   'connector' = 'print'
            |)
            |""".stripMargin)
    
        // 3. transfer
    
        // 4. sink
    //    logStream.print()
    val result: TableResult = tEnv.sqlQuery(
      """
        |   SELECT t_hour, t_minute, COUNT(1) AS cnt
        |   FROM log
        |   WHERE status_code = 200
        |   GROUP BY t_hour, t_minute
        |""".stripMargin).execute()
    
        result.print()
    
        // 5. execute
        env.execute("calc log count for minute and hour").wait()
    //    tEnv.execute("calc log count for minute and hour")
      }
    }
    
    
    
  • 相关阅读:
    BestCoder 2nd Anniversary/HDU 5718 高精度 模拟
    FZU 2168 前缀和+dp递推
    poj 1088 记忆化搜索
    Codeforces Round #241 (Div. 2) B dp
    poj 3053 优先队列处理
    取消修改
    csv乱码
    iconv()
    cakephp中sql查询between
    虚拟机上linux与windows之间复制粘贴
  • 原文地址:https://www.cnblogs.com/zpzhue/p/14948086.html
Copyright © 2011-2022 走看看