zoukankan      html  css  js  c++  java
  • Flink学习(三) 批流版本的wordcount Scala版本

    批处理代码:

    package com.wyh.wc
    
    import org.apache.flink.api.scala._
    
    /**
      * 批处理代码
      */
    object WordCount {
      def main(args: Array[String]): Unit = {
        //创建一个批处理的一个环境
        val env = ExecutionEnvironment.getExecutionEnvironment
    
        val inputPath = "D:\shujia\shujia006\FlinkWyh\src\main\data\word"
    
        val inputDataSet = env.readTextFile(inputPath)
    
        //分词之后做count
        val wordcountSet = inputDataSet
          .flatMap(lines => lines.split(" "))
          .map((_, 1))
          .groupBy(0)
          .sum(1)
    
        //打印
        wordcountSet.map(x => {
          x._1 + " " + x._2
        }).print()
    
    
      }
    
    }

    流处理代码:

    package com.wyh.wc
    
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.scala._
    
    object StreamWordCount {
      def main(args: Array[String]): Unit = {
        //创建一个流处理的执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //为了host和port不写死,flink提供了一个方法
        val params = ParameterTool.fromArgs(args)
    
    //    val host = params.get("host")
    //
    //    val port = params.getInt("port")
    
        //env.disableOperatorChaining()//全局打散  一个算子一个任务
        //每一个算子也会有个方法  .disableChaining() 将这个算子单独拿出来
        //还有个方法.startNewChain() 将当前算子之前面和后面 分开
    
        //部署到集群中接收socket数据流
    //    val dataStream: DataStream[String] = env.socketTextStream(host, port)
    
        //接收socket数据流
        val dataStream = env.socketTextStream("localhost", 9999)
    
        //逐一读取数据,打散进行WordCount
        val wordCountStream = dataStream.flatMap(_.split("\s"))
          .filter(_.nonEmpty)
          .map((_, 1))
          .keyBy(0)
          .sum(1)
    
        wordCountStream.print().setParallelism(1)
    
    
        //比批处理多一个步骤
        //真正执行这个任务,启动它的Executor
        env.execute("WordCountStream")
    
    
      }
    
    }
  • 相关阅读:
    在openwrt上初体验PostgreSQL数据库
    Linux学习笔记(7)-系统资源查看
    Linux学习笔记(6)-工作管理
    Linux学习笔记(5)-进程管理
    Linux学习笔记(4)-文本编辑器vi的使用
    linux学习笔记(3)-文件系统
    Linux学习笔记(2)-用户和用户组
    linux学习笔记(1)-文件处理相关命令
    68.vivado与modelsim的关联以及器件库编译
    67.ARP协议
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12872872.html
Copyright © 2011-2022 走看看