zoukankan      html  css  js  c++  java
  • Flink 之分流Select与Split

    1、数据格式

    sensor_1,1547718199,35.8
    sensor_6,1547718201,15.4
    sensor_7,1547718202,6.7
    sensor_10,1547718205,38.1
    sensor_1,1547718206,32
    sensor_1,1547718208,36.2
    sensor_1,1547718210,29.7
    sensor_1,1547718213,30.9

    2、处理主类

    package com.yangwj.api
    
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    /**
     * @author yangwj
     * @date 2021/1/4 22:08
     * @version 1.0
     */
    object StreamTest {
    
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
        val input: DataStream[String] = env.readTextFile(inputFile)
    
        val dataStream = input.map(data => {
          val arr: Array[String] = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        //分流
        val splitStream: SplitStream[SensorReading] = dataStream.split(data => {
          if (data.temperature > 30.0) Seq("high") else Seq("low")
        })
    
        val highDataStream: DataStream[SensorReading] = splitStream.select("high")
        val lowDataStream: DataStream[SensorReading] = splitStream.select("low")
        val allDataStream: DataStream[SensorReading] = splitStream.select("high","low")
    
        highDataStream.print("high")
        lowDataStream.print("low")
        allDataStream.print("all")
        env.execute("select test")
      }
    
    
    }
  • 相关阅读:
    hello , world Tkinter代码描述
    Tkinter 类
    什么是Tkinter?
    99_恢复二叉搜索树
    总结eclipse中常用好用的快捷键或者自定义一下快捷键:
    封装与职责分离的开发思维
    正在学习的路上
    串比较
    坚持的力量 第二十篇
    串连接
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14238023.html
Copyright © 2011-2022 走看看