zoukankan      html  css  js  c++  java
  • Flink之Transform操作

    import org.apache.flink.api.common.functions.FilterFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    object TransformTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //设置全局并行度为1

    import org.apache.flink.api.scala._
    val streamFromFile = env.readTextFile("sensor1.txt")

    //1.基本转换算子和简单聚合算子
    val dataStream = streamFromFile.map(data => {
    val dataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    }
    )
    //注意观察结果,flink是来一条数据处理一条,所以不会只看到最终求和后的结果
    dataStream.keyBy(0).sum(2).print()
    dataStream.keyBy("id").sum("temperature").print() //方式二
    //例子:输出当前传感器最新的温度加10,而时间戳是上一次数据的时间戳加1
    dataStream.keyBy(0).reduce((x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10)).print() //x和y分别代表当前值和新来的值

    //2.多流转换算子
    //split分流
    val splitStream = dataStream.split(sensorData => {
    if (sensorData.temperature > 30)
    Seq("high")
    else
    Seq("low")
    })
    val high = splitStream.select("high")
    val low = splitStream.select("low")
    val all = splitStream.select("high", "low")
    high.print("high temperature")
    low.print("low temperature")
    all.print("all")

    //合并: connect和union
    /*
    * 1.union之前两个流的类型必须是一样, connect可以不一样, 在之后的coMap中再去调整成为一样的
    * 2. connect只能操作两个流, union可以操作多个
    */
    val warning = high.map(x => (x.id, x.temperature))
    val connectedStream = warning.connect(low)
    val coMap = connectedStream.map(
    warningData => (warningData._1, warningData._2, "warning"),
    safeData => (safeData.id, "safe")
    )
    coMap.print()

    val unionStream = high.union(low)
    unionStream.print()

    // streamFromFile.map(data => {
    // val len = data.split(",")
    // len(0) + " " + len(1)
    // }).print()

    //自定义函数类
    dataStream.filter(new MyFilter).print()

    env.execute("transform test")
    }
    }

    class MyFilter() extends FilterFunction[SensorReading] {
    override def filter(t: SensorReading): Boolean = {
    t.id.startsWith("sensor_1")
    }
    }

    有帮助的欢迎评论打赏哈,谢谢!
  • 相关阅读:
    板子们~缓慢更新
    Hello World!
    [SHOI2008]堵塞的交通traffic
    [JSOI2008]最大数
    [SCOI2005]扫雷
    [HAOI2007]上升序列
    [HAOI2007]理想的正方形
    [SCOI2003]字符串折叠
    [HAOI2008]移动玩具
    [BJOI2006]狼抓兔子
  • 原文地址:https://www.cnblogs.com/wddqy/p/12172284.html
Copyright © 2011-2022 走看看