zoukankan      html  css  js  c++  java
  • flink-demo2

    package cn.irisz.steam
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    object Demo2 {
      def main(args: Array[String]): Unit = {
        // 1. env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.BATCH)
        env.setParallelism(1)
        val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val tEnv = StreamTableEnvironment.create(env, settings)
    
        // 2. source
        // val fileSource: DataStream[String] = env.readTextFile("data/aceess.log_20200914.csv")
        tEnv.executeSql(
          """
            |CREATE TABLE log (
            |   `id` Int,
            |   `i_city` String,
            |   `i_country` String,
            |   `i_isp` String,
            |   `i_province` String,
            |   `ip` String,
            |   `length` BigInt,
            |   `method` String,
            |   `referer` String,
            |   `status_code` Int,
            |   `t_hour` Int,
            |   `t_minute` Int,
            |   `t` TIMESTAMP,
            |   `ua` String,
            |   `url` String,
            |   `url_param` String,
            |   `url_path` String,
            |   `version` String,
            |   `xff` String
            |)WITH (
            |   'connector' = 'filesystem',
            |   'path' = 'data/aceess.log_20200914.csv',
            |   'format' = 'csv'
            |)
            |""".stripMargin)
    
        tEnv.executeSql(
          """
            |CREATE TABLE `result` (
            |   `t_hour` Int,
            |   `t_minute` Int,
            |   `cnt` BigInt
            |) WITH (
            |   'connector' = 'print'
            |)
            |""".stripMargin)
    
        // 3. transfer
    
        // 4. sink
    //    logStream.print()
    val result: TableResult = tEnv.sqlQuery(
      """
        |   SELECT t_hour, t_minute, COUNT(1) AS cnt
        |   FROM log
        |   WHERE status_code = 200
        |   GROUP BY t_hour, t_minute
        |""".stripMargin).execute()
    
        result.print()
    
        // 5. execute
        env.execute("calc log count for minute and hour").wait()
    //    tEnv.execute("calc log count for minute and hour")
      }
    }
    
    
    
  • 相关阅读:
    SpringBoot 项目瘦身
    对比两个文本的异同
    Spring 事务不起作用的场景
    Controller 层数据校验实现思路
    Notify 类的实现思路
    backup: 使用 vim 时一定会用到的设置 --for-myself
    exercise: 反射获取指定的属性值 --CSharp
    exercise: 序列化和反序列化Xml --CSharp
    前缀
    华罗庚的数学思想
  • 原文地址:https://www.cnblogs.com/zpzhue/p/14948086.html
Copyright © 2011-2022 走看看