zoukankan      html  css  js  c++  java
  • Flink--DateSet的Transformation简单操作

    flatMap函数

    //初始化执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //加载数据
    val data = env.fromElements(("A" , 1) , ("B" , 1) , ("C" , 1))
    //使用trasformation加载这些数据
    //TODO map
    val map_result = data.map(line => line._1+line._2)
    map_result.print()
    //TODO flatmap
    val flatmap_result = data.flatMap(line => line._1+line._2)
    flatmap_result.print()

    练习:如下数据

    A;B;C;D;B;D;C
    B;D;A;E;D;C
    A;B

    要求:统计相邻字符串出现的次数

    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    /**
      * Created by angel;
      */
    object demo {
    /**
    A;B;C;D;B;D;C
    B;D;A;E;D;C
    A;B
    统计相邻字符串出现的次数(A+B , 2) (B+C , 1)....
      * */
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val data = env.fromElements("A;B;C;D;B;D;C;B;D;A;E;D;C;A;B")
        val map_data: DataSet[Array[String]] = data.map(line => line.split(";"))
        //[A,B,C,D] ---"A,B,C,D"
        //[A,B,C,D] ---> (x,1) , (y,1) -->groupBy--->sum--total
        val tupe_data = map_data.flatMap{
          line =>
            for(index <- 0 until line.length-1) yield (line(index)+"+"+line(index+1) , 1)
        }
        val gropudata = tupe_data.groupBy(0)
        val result = gropudata.sum(1)
        result.print()
      }
    }
    View Code

    mapPartition函数

    //TODO mapPartition
    val ele_partition = elements.setParallelism(2)//将分区设置为2
    val partition = ele_partition.mapPartition(line => line.map(x=> x+"======"))//line是每个分区下面的数据
    partition.print()

    mapPartition:是一个分区一个分区拿出来的 好处就是以后我们操作完数据了需要存储到mysql中,这样做的好处就是几个分区拿几个连接,如果用map的话,就是多少条数据拿多少个mysql的连接

    filter函数

    Filter函数在实际生产中特别实用,数据处理阶段可以过滤掉大部分不符合业务的内容,可以极大减轻整体flink的运算压力

    //TODO fileter
    val filter:DataSet[String] = elements.filter(line => line.contains("java"))//过滤出带java的数据
    filter.print()

    reduce函数

    //TODO reduce
     val elements:DataSet[List[Tuple2[String , Int]]] = env.fromElements(List(("java" , 1) , ("scala" , 1) , ("java" , 1)))
      val tuple_map = elements.flatMap(x=> x)//拆开里面的list,编程tuple
      val group_map = tuple_map.groupBy(x => x._1)//按照单词聚合
    val reduce = group_map.reduce((x,y) => (x._1 ,x._2+y._2))
    reduce.print()

    reduceGroup

    reduceGroup是reduce的一种优化方案;

    它会先分组reduce,然后在做整体的reduce;这样做的好处就是可以减少网络IO;

      //TODO reduceGroup
    
      val elements:DataSet[List[Tuple2[String , Int]]] = env.fromElements(List(("java" , 1) ,("java" , 1), ("scala" , 1)))
      val tuple_words = elements.flatMap(x=>x)
      val group_words = tuple_words.groupBy(x => x._1)
      val a = group_words.reduceGroup{
        (in:Iterator[(String,Int)],out:Collector[(String , Int)]) =>
          val result = in.reduce((x, y) => (x._1, x._2+y._2))
          out.collect(result)
      }
      a.print()
    }

    GroupReduceFunction和GroupCombineFunction(自定义函数)

    import collection.JavaConverters._
    class Tuple3GroupReduceWithCombine extends GroupReduceFunction[( String , Int), (String, Int)] with GroupCombineFunction[(String, Int), (String, Int)] {
      override def reduce(values: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
    
        for(in <- values.asScala){
          out.collect((in._1 , in._2))
        }
      }
    
      override def combine(values: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
        val map = new mutable.HashMap[String , Int]()
        var num = 0
        var s = ""
        for(in <- values.asScala){
            num += in._2
            s = in._1
        }
        out.collect((s , num))
      }
    }
    //  TODO GroupReduceFunction  GroupCombineFunction
    val env = ExecutionEnvironment.getExecutionEnvironment
    val elements:DataSet[List[Tuple2[String , Int]]] = env.fromElements(List(("java" , 3) ,("java" , 1), ("scala" , 1)))
    val collection = elements.flatMap(line => line)
    val groupDatas:GroupedDataSet[(String, Int)] = collection.groupBy(line => line._1)
    //在reduceGroup下使用自定义的reduce和combiner函数
    val result = groupDatas.reduceGroup(new Tuple3GroupReduceWithCombine())
    val result_sort = result.collect().sortBy(x=>x._1)
    println(result_sort)

    combineGroup

    使用之前的group操作,比如:reduceGroup或者GroupReduceFuncation;这种操作很容易造成内存溢出;因为要一次性把所有的数据一步转化到位,所以需要足够的内存支撑,如果内存不够的情况下,那么需要使用combineGroup; combineGroup在分组数据集上应用GroupCombineFunction。 GroupCombineFunction类似于GroupReduceFunction,但不执行完整的数据交换。 【注意】:使用combineGroup可能得不到完整的结果而是部分的结果

    import collection.JavaConverters._
    class MycombineGroup extends GroupCombineFunction[Tuple1[String] , (String , Int)]{
      override def combine(iterable: Iterable[Tuple1[String]], out: Collector[(String, Int)]): Unit = {
        var key: String = null
        var count = 0
        for(line <- iterable.asScala){
          key = line._1
          count += 1
        }
        out.collect((key, count))
    
      }
    }
    //TODO  combineGroup
    val input = env.fromElements("a", "b", "c", "a").map(Tuple1(_))
    val combinedWords = input.groupBy(0).combineGroup(new MycombineGroup())
    combinedWords.print()

    Aggregate

    在数据集上进行聚合求最值(最大值、最小值) 注意:Aggregate只能作用于元组上

    //TODO Aggregate
    val data = new mutable.MutableList[(Int, String, Double)]
    data.+=((1, "yuwen", 89.0))
    data.+=((2, "shuxue", 92.2))
    data.+=((3, "yingyu", 89.99))
    data.+=((4, "wuli", 98.9))
    data.+=((1, "yuwen", 88.88))
    data.+=((1, "wuli", 93.00))
    data.+=((1, "yuwen", 94.3))
    //    //fromCollection将数据转化成DataSet
    val input: DataSet[(Int, String, Double)] = env.fromCollection(Random.shuffle(data))
    val output = input.groupBy(1).aggregate(Aggregations.MAX, 2)
    output.print()

    minBy和maxBy

    //TODO MinBy / MaxBy
    val data = new mutable.MutableList[(Int, String, Double)]
    data.+=((1, "yuwen", 90.0))
    data.+=((2, "shuxue", 20.0))
    data.+=((3, "yingyu", 30.0))
    data.+=((4, "wuli", 40.0))
    data.+=((5, "yuwen", 50.0))
    data.+=((6, "wuli", 60.0))
    data.+=((7, "yuwen", 70.0))
    //    //fromCollection将数据转化成DataSet
    val input: DataSet[(Int, String, Double)] = env.fromCollection(Random.shuffle(data))
    val output: DataSet[(Int, String, Double)] = input
      .groupBy(1)
      //求每个学科下的最小分数
      //minBy的参数代表要求哪个字段的最小值
      .minBy(2)
    output.print()

    distinct去重

    //TODO distinct 去重
      val data = new mutable.MutableList[(Int, String, Double)]
      data.+=((1, "yuwen", 90.0))
      data.+=((2, "shuxue", 20.0))
      data.+=((3, "yingyu", 30.0))
      data.+=((4, "wuli", 40.0))
      data.+=((5, "yuwen", 50.0))
      data.+=((6, "wuli", 60.0))
      data.+=((7, "yuwen", 70.0))
      //    //fromCollection将数据转化成DataSet
      val input: DataSet[(Int, String, Double)] = env.fromCollection(Random.shuffle(data))
      val distinct = input.distinct(1)
      distinct.print()

    join

    Flink在操作过程中,有时候也会遇到关联组合操作,这样可以方便的返回想要的关联结果,比如:

    求每个班级的每个学科的最高分数

    //TODO join
        val data1 = new mutable.MutableList[(Int, String, Double)]
        //学生学号---学科---分数
        data1.+=((1, "yuwen", 90.0))
        data1.+=((2, "shuxue", 20.0))
        data1.+=((3, "yingyu", 30.0))
        data1.+=((4, "yuwen", 40.0))
        data1.+=((5, "shuxue", 50.0))
        data1.+=((6, "yingyu", 60.0))
        data1.+=((7, "yuwen", 70.0))
        data1.+=((8, "yuwen", 20.0))
        val data2 = new mutable.MutableList[(Int, String)]
        //学号 ---班级
        data2.+=((1,"class_1"))
        data2.+=((2,"class_1"))
        data2.+=((3,"class_2"))
        data2.+=((4,"class_2"))
        data2.+=((5,"class_3"))
        data2.+=((6,"class_3"))
        data2.+=((7,"class_4"))
        data2.+=((8,"class_1"))
        val input1: DataSet[(Int, String, Double)] = env.fromCollection(Random.shuffle(data1))
        val input2: DataSet[(Int, String)] = env.fromCollection(Random.shuffle(data2))
        //求每个班级下每个学科最高分数
        val joindata = input2.join(input1).where(0).equalTo(0){
          (input2 , input1) => (input2._1 , input2._2 , input1._2 , input1._3)
        }
    //    joindata.print()
    //    println("===================")
        val aggregateDataSet = joindata.groupBy(1,2).aggregate(Aggregations.MAX , 3)
        aggregateDataSet.print()

    cross交叉操作

    和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作;

    //TODO Cross 交叉操作,会产生笛卡尔积
    val data1 = new mutable.MutableList[(Int, String, Double)]
    //学生学号---学科---分数
    data1.+=((1, "yuwen", 90.0))
    data1.+=((2, "shuxue", 20.0))
    data1.+=((3, "yingyu", 30.0))
    data1.+=((4, "yuwen", 40.0))
    data1.+=((5, "shuxue", 50.0))
    data1.+=((6, "yingyu", 60.0))
    data1.+=((7, "yuwen", 70.0))
    data1.+=((8, "yuwen", 20.0))
    val data2 = new mutable.MutableList[(Int, String)]
    //学号 ---班级
    data2.+=((1,"class_1"))
    data2.+=((2,"class_1"))
    data2.+=((3,"class_2"))
    data2.+=((4,"class_2"))
    data2.+=((5,"class_3"))
    data2.+=((6,"class_3"))
    data2.+=((7,"class_4"))
    data2.+=((8,"class_1"))
    val input1: DataSet[(Int, String, Double)] = env.fromCollection(Random.shuffle(data1))
    val input2: DataSet[(Int, String)] = env.fromCollection(Random.shuffle(data2))
    val cross = input1.cross(input2){
      (input1 , input2) => (input1._1,input1._2,input1._3,input2._2)
    }
    cross.print()

    union

    将多个DataSet合并成一个DataSet

    【注意】:union合并的DataSet的类型必须是一致的

    //TODO union联合操作
    val elements1 = env.fromElements(("123"))
    val elements2 = env.fromElements(("456"))
    val elements3 = env.fromElements(("123"))
    val union = elements1.union(elements2).union(elements3).distinct(line => line)
    union.print()

    rebalance

    Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况:

    这个时候本来总体数据量只需要10分钟解决的问题,出现了数据倾斜,机器1上的任务需要4个小时才能完成,那么其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

    所以在实际的工作中,出现这种情况比较好的解决方案就是本节课要讲解的—rebalance(内部使用round robin方法将数据均匀打散。这对于数据倾斜时是很好的选择。)

    举例:

    1:在不使用rebalance的情况下,观察每一个线程执行的任务特点

       val ds = env.generateSequence(1, 3000)
        val rebalanced = ds.filter(_ > 780)
    //    val rebalanced = skewed.rebalance()
        val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
          def map(in: Long) = {
            //获取并行时子任务的编号getRuntimeContext.getIndexOfThisSubtask
            (getRuntimeContext.getIndexOfThisSubtask, in)
          }
        })
        countsInPartition.print()
    【数据随机的分发给各个子任务(分区)】

    2:使用rebalance

    //TODO rebalance
    val ds = env.generateSequence(1, 3000)
    val skewed = ds.filter(_ > 780)
    val rebalanced = skewed.rebalance()
    val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
      def map(in: Long) = {
        //获取并行时子任务的编号getRuntimeContext.getIndexOfThisSubtask
        (getRuntimeContext.getIndexOfThisSubtask, in)
      }
    })
    countsInPartition.print()

    每隔8一次循环(数据使用轮询的方式在各个子任务中执行)

    first

    //TODO first-取前N个
        val data = new mutable.MutableList[(Int, Long, String)]
        data.+=((1, 1L, "Hi"))
        data.+=((2, 2L, "Hello"))
        data.+=((3, 2L, "Hello world"))
        data.+=((4, 3L, "Hello world, how are you?"))
        data.+=((5, 3L, "I am fine."))
        data.+=((6, 3L, "Luke Skywalker"))
        data.+=((7, 4L, "Comment#1"))
        data.+=((8, 4L, "Comment#2"))
        data.+=((9, 4L, "Comment#3"))
        data.+=((10, 4L, "Comment#4"))
        data.+=((11, 5L, "Comment#5"))
        data.+=((12, 5L, "Comment#6"))
        data.+=((13, 5L, "Comment#7"))
        data.+=((14, 5L, "Comment#8"))
        data.+=((15, 5L, "Comment#9"))
        data.+=((16, 6L, "Comment#10"))
        data.+=((17, 6L, "Comment#11"))
        data.+=((18, 6L, "Comment#12"))
        data.+=((19, 6L, "Comment#13"))
        data.+=((20, 6L, "Comment#14"))
        data.+=((21, 6L, "Comment#15"))
        val ds = env.fromCollection(Random.shuffle(data))
    //    ds.first(10).print()
        //还可以先goup分组,然后在使用first取值
        ds.groupBy(line => line._2).first(2).print()
  • 相关阅读:
    springdataJpa对无主键表或视图查询的支持
    Blynk系列随笔
    arduino系列文章
    Debezium系列随笔
    Kafka系列随笔
    SSAS 收藏
    Saiku 系列
    Mondrian系列
    数据仓库理论学习
    加密解密
  • 原文地址:https://www.cnblogs.com/niutao/p/10548385.html
Copyright © 2011-2022 走看看