zoukankan      html  css  js  c++  java
  • Flink中的算子操作

    一、Connect

    DataStream,DataStream ->  ConnectedStream,连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式

    不发生任何变化,两个流相互独立。

    import org.apache.flink.streaming.api.scala._
    
    object Connect {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        var stream01 = env.generateSequence(1,10)
        val stream = env.readTextFile("test001.txt")
        val stream02 = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
        val streamConnect: ConnectedStreams[Long, String] = stream01.connect(stream02)
        //两个流各自处理各自的,互不干扰
        val stream03: DataStream[Any] = streamConnect.map(item => item * 2, item => (item,1L))
    
        stream03.print()
        env.execute("Connect")
      }
    }

    二、CoMap,CoFlatMap

    ConnectedStreams  ->  DataStream:作用于ConnectedStream上,功能与map和flatMap一样,对ConnectedStram中的每一个Stream分别进行map和flatMap

    三、Split 

    import org.apache.flink.streaming.api.scala._
    
    object Split {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" "))
        val streamSplit: SplitStream[String] = stream.split(
          word =>
            ("hadoop".equals(word) match {
              case true => List("hadoop") //值等于hadoop的流加入到一个List中
              case false => List("other")//值不等于hadoop的流加入到一个List中
            })
        )
        //取出属于各自部分的流
        val value01: DataStream[String] = streamSplit.select("hadoop")
        val value02: DataStream[String] = streamSplit.select("other")
    
        value01.print()
        value02.print()
    
        env.execute("Split Job")
    
      }
    
    }

    四、Union

    DataStream -> DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新的DataStream。

    注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

    五、KeyBy(比较重要)

    DataStream -> KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

    把所有相同key的数据聚合在一起

    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.scala._
    
    object KeyBy {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" "))
        //将相同key数据进行聚合
        //同一个key的数据都划分到同一个分区中
        val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.map(item => (item,1)).keyBy(0)
        
        streamKeyBy.print()
        env.execute("KeyBy Job")
    
      }
    }

    六、Reduce

    KeyedStream -> DataStream,一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,

    而不是只返回最后一次聚合的最终结果。

    数据流如何在两个 transformation 组件中传输的?

    一对一流(=spark窄依赖):(比如source=>map过程)保持元素分区和排序

    redistributing流(=spark宽依赖):(map=>keyBy/window 之间,以及keyBy/window与sink之间)改变了流分区。

    每一个算子任务根据所选的转换,向不同的目标子任务发送数据。

    比如:keyBy,根据key的hash值重新分区、broadcast、rebalance(类似shuffle过程)。在一次 redistributing交换中,元素间排序,只针对发送方

    的partition和接收partition方。最终到sink端的排序是不确定的。

  • 相关阅读:
    关于求 p_i != i and p_i != i+1 的方案数的思考过程
    poj 3041 Asteroids 二分图最小覆盖点
    poj 1325 Machine Schedule 最小顶点覆盖
    poj 1011 Sticks 减枝搜索
    poj 1469 COURSES 最大匹配
    zoj 1516 Uncle Tom's Inherited Land 最大独立边集合(最大匹配)
    Path Cover (路径覆盖)
    hdu 3530 SubSequence TwoPoint单调队列维护最值
    zoj 1654 Place the Rebots 最大独立集转换成二分图最大独立边(最大匹配)
    poj 1466 Girls and Boys 二分图最大独立子集
  • 原文地址:https://www.cnblogs.com/ssqq5200936/p/11013477.html
Copyright © 2011-2022 走看看