zoukankan      html  css  js  c++  java
  • Flink的广播变量

    Flink支持广播变量,就是将数据广播到具体的taskmanager上,数据存储在内存中,这样可以减缓大量的shuffle操作;

    比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

    注意因为广播变量是要把dataset广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题

    Broadcast:Broadcast是通过withBroadcastSet(dataset,string)来注册的
    
    Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量
    /**
      * Created by angel;
      */
    object BrodCast {
      def main(args: Array[String]): Unit = {
        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
        //TODO data2  join  data3的数据,使用广播变量完成
        val data2 = new mutable.MutableList[(Int, Long, String)]
        data2.+=((1, 1L, "Hi"))
        data2.+=((2, 2L, "Hello"))
        data2.+=((3, 2L, "Hello world"))
        val ds1 = env.fromCollection(Random.shuffle(data2))
        val data3 = new mutable.MutableList[(Int, Long, Int, String, Long)]
        data3.+=((1, 1L, 0, "Hallo", 1L))
        data3.+=((2, 2L, 1, "Hallo Welt", 2L))
        data3.+=((2, 3L, 2, "Hallo Welt wie", 1L))
        val ds2 = env.fromCollection(Random.shuffle(data3))
        //todo 使用内部类RichMapFunction,提供open和map,可以完成join的操作
        val result = ds1.map(new RichMapFunction[(Int , Long , String) , ArrayBuffer[(Int , Long , String , String)]] {
    
          var brodCast:mutable.Buffer[(Int, Long, Int, String, Long)] = null
    
          override def open(parameters: Configuration): Unit = {
            import scala.collection.JavaConverters._
            //asScala需要使用隐式转换
            brodCast = this.getRuntimeContext.getBroadcastVariable[(Int, Long, Int, String, Long)]("ds2").asScala
          }
          override def map(value: (Int, Long, String)):ArrayBuffer[(Int , Long , String , String)] = {
            val toArray: Array[(Int, Long, Int, String, Long)] = brodCast.toArray
            val array = new mutable.ArrayBuffer[(Int , Long , String , String)]
            var index = 0
    
            var a:(Int, Long, String, String) = null
            while(index < toArray.size){
              if(value._2 == toArray(index)._5){
                a = (value._1 , value._2 , value._3 , toArray(index)._4)
                array += a
              }
              index = index + 1
            }
            array
          }
        }).withBroadcastSet(ds2 , "ds2")
        println(result.collect())
      }
    }
    View Code
  • 相关阅读:
    P4387 P4387 【深基15.习9】验证栈序列
    P1241 括号序列题解
    P2058 海港题解
    P1540 机器翻译题解
    leaflet + react + typescript
    TypeScript中文手册:从 JavaScript 迁移到 TypeScript
    react-esri-leaflet与typescript
    TypeError: Super expression must either be null or a function
    前端库(gis前端库和普通库分开)
    react-leaflet:Module parse failed: Unexpected token (10:41)
  • 原文地址:https://www.cnblogs.com/niutao/p/10548481.html
Copyright © 2011-2022 走看看