zoukankan      html  css  js  c++  java
  • FlinkStream第一个代码WordCount

    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    object SteamWordCount {
      def main(args: Array[String]): Unit = {
        //创建流处理的执行环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //设置并行度
    //    env.setParallelism(2)
    
        //从外部命令中,提取参数,作为socket主机名和端口号
        val paramtoool = ParameterTool.fromArgs(args)
        val host = paramtoool.get("host")
        val hort = paramtoool.getInt("port")
        //接受一个socket文本流
        val inputDataStream: DataStream[String] = env.socketTextStream("192.168.0.20", 777)
    //        进行转换处理统计
            inputDataStream
              .flatMap(_.split(" "))
              .filter(_.nonEmpty)
              .map((_,1))
              .keyBy(0)
              .sum(1)
              .print()
        env.execute("word count")
      }
    }
    author@nohert
  • 相关阅读:
    join函数——Gevent源码分析
    代理上网(ssh 动态端口转发)
    内核热patch
    技术债
    mysql 隔离级别与间隙锁等
    python type
    django : related_name and related_query_name
    ssh 卡主
    logistics regression
    __new__ 和 __init__
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928152.html
Copyright © 2011-2022 走看看