zoukankan      html  css  js  c++  java
  • Flink优化总结

    Flink优化

    tuples 是否比pojo性能好呢? 是的.

    类继承自 Tuple

    public class ProvinceEvent extends Tuple3<Long,String,String> {
    //    private Long timestamps;
    //    private String phonenum;
    //    private String province;
    
        public ProvinceEvent( Long timestamps, String phonenum, String province) {
            super(timestamps,phonenum,province);
            this.f0 = timestamps;
            this.f1 = phonenum;
            this.f2 = province;
        }
    
        public Long getTimestamps(){
            return this.f0;
        }
    
        public void setTimestamps(Long timestamps) {
            this.f0 = timestamps;
        }
    
        public String getPhonenum() {
            return this.f1;
        }
    
        public void setPhonenum(String phonenum) {
            this.f1 = phonenum;
        }
    
        public String getProvince() {
            return this.f2;
        }
    
        public void setProvince(String province) {
            this.f2 = province;
        }
    }
    
    

    2.复用Flink对象

    错误示例:

    stream
        .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
            @Override
            public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
                long changesCount = ...
                // A new Tuple instance is created on every execution
                collector.collect(new Tuple2<>(userName, changesCount));
            }
        }
    

    可以看出,apply函数每执行一次,都会新建一个Tuple2类的实例,因此增加了对垃圾收集器的压力。解决这个问题的一种方法是反复使用相同的实例:

    stream
        .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
            // Create an instance that we will reuse on every call
            private Tuple2<String, Long> result = new Tuple<>();
    
            @Override
            public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
                long changesCount = ...
    
                // Set fields on an existing object instead of creating a new one
                result.f0 = userName;
                // Auto-boxing!! A new Long value may be created
                result.f1 = changesCount;
    
                // Reuse the same Tuple2 object
                collector.collect(result);
            }
        }
    

    这种做法更好一点。虽然每次调用时都新建一个Tuple2的实例,但是其实还间接创建了Long类的实例。为了解决这个问题,Flink有许多所谓的value class:IntValueLongValueStringValueFloatValue等。下面介绍一下如何使用它们:

    最优方案:

    stream
        .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
            // 创建一个可变的计算实例
            private LongValue count = new IntValue();
            // 分配可变的元组
            private Tuple2<String, LongValue> result = new Tuple<>("", count);
    
            @Override
            // 请注意,现在我们有不同的返回类型
            public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
                long changesCount = ...
    
                // 设置现有对象上的字段,而不是创建一个新对象
                result.f0 = userName;
                // 设置现有对象上的字段,而不是创建一个新对象
                count.setValue(changesCount);
    
                // 重用相同的元组,同一个LongValue实例
                // 每次发送出去的对象要一样
                collector.collect(result);
            }
        }
    
    

    3. 使用注解功能

    4.Select Join Type

    5 给算子添加uid

    6window内数据倾斜

    原理:

    1.首先将key打散,我们加入将key转化为 key-随机数 ,保证数据散列

    2.对打散后的数据进行聚合统计,这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)

    3.将散列key还原成我们之前传入的key,这时我们的到数据是聚合统计后的结果,不是最初的原数据

    4.二次keyby进行结果统计,输出到addSink

     
     
     
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.api.scala.typeutils.Types
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
     
    object ProcessFunctionScalaV2 {
     
     
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.enableCheckpointing(2000)
        val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
        val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))
        val dataStream: DataStream[(String, Long)] = typeAndData
          .map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))
        val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1)
          .timeWindow(Time.seconds(10))
          .aggregate(new CountAggregate())
        keyByAgg.print("第一次keyby输出")
        val result: DataStream[DataJast] = keyByAgg.map(data => {
          val newKey: String = data.key.substring(0, data.key.indexOf("-"))
          println(newKey)
          DataJast(newKey, data.count)
        }).keyBy(_.key)
          .process(new MyProcessFunction())
        result.print("第二次keyby输出")
     
     
        env.execute()
      }
     
      case class DataJast(key :String,count:Long)
     
      //计算keyby后,每个Window中的数据总和
      class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {
     
        override def createAccumulator(): DataJast = {
          println("初始化")
          DataJast(null,0)
        }
     
        override def add(value: (String, Long), accumulator: DataJast): DataJast = {
          if(accumulator.key==null){
            printf("第一次加载,key:%s,value:%d
    ",value._1,value._2)
            DataJast(value._1,value._2)
          }else{
            printf("数据累加,key:%s,value:%d
    ",value._1,accumulator.count+value._2)
            DataJast(value._1,accumulator.count + value._2)
          }
        }
     
        override def getResult(accumulator: DataJast): DataJast = {
          println("返回结果:"+accumulator)
          accumulator
        }
     
        override def merge(a: DataJast, b: DataJast): DataJast = {
          DataJast(a.key,a.count+b.count)
        }
      }
     
     
      /**
       * 实现:
       *    根据key分类,统计每个key进来的数据量,定期统计数量
       */
      class MyProcessFunction extends  KeyedProcessFunction[String,DataJast,DataJast]{
     
        val delayTime : Long = 1000L * 30
     
        lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long]))
     
        override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {
          if(valueState.value()==0){
            valueState.update(value.count)
            printf("运行task:%s,第一次初始化数量:%s
    ",getRuntimeContext.getIndexOfThisSubtask,value.count)
            val currentTime: Long = ctx.timerService().currentProcessingTime()
            //注册定时器
            ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
          }else{
            valueState.update(valueState.value()+value.count)
            printf("运行task:%s,更新统计结果:%s
    " ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())
          }
        }
     
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {
          //定时器执行,可加入业务操作
          printf("运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s
    ",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())
     
          //定时统计完成,初始化统计数据
          valueState.update(0)
          //注册定时器
          val currentTime: Long = ctx.timerService().currentProcessingTime()
          ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
        }
      }
     
     
     
    }
    

    7 算子优化

    1.ReduceFunction,AggregateFunction

    在每一个窗口中,增量的计算每一个到达的元素。

    就是来一个数据,计算一个数据。

    2. WindowFunction,ProcessWindowFunction

    先把窗口的数据都缓存下来,然后再一起计算,数据在集合里面。(不够高效)

    可以拿到窗口的开始,结束时间。

    8 批处理的优化

    • 语义注解

    • 使用StringValue

    读取文本文件内容,将文件内容转换成DataSet[StringValue]类型数据集。StringValue是一种可变的String类型,通过StringValue存储文本数据可以有效降低String对象创建数量,从而降低系统性能上的开销。

    • 开启对象重用
    env.getConfig().enableObjectReuse();
    

    9去重的优化

    使用bitmap

  • 相关阅读:
    SGU 176.Flow construction (有上下界的最大流)
    POJ 2391.Ombrophobic Bovines (最大流)
    poj 1087.A Plug for UNIX (最大流)
    poj 1273.PIG (最大流)
    POJ 2112.Optimal Milking (最大流)
    SGU 196.Matrix Multiplication
    SGU 195. New Year Bonus Grant
    关于multicycle path
    ppt做gif动图
    codeforces 598A Tricky Sum
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14035032.html
Copyright © 2011-2022 走看看