zoukankan      html  css  js  c++  java
  • 初识Flink广播变量broadcast

      Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration
    import scala.collection.mutable.ListBuffer
    
    object BatchDemoBroadcastScala {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        //1: 准备需要广播的数据
        val broadData = ListBuffer[Tuple2[String,Int]]()
        broadData.append(("zs",18))
        broadData.append(("ls",20))
        broadData.append(("ww",17))
        //1.1处理需要广播的数据
        val tupleData = env.fromCollection(broadData)
        val toBroadcastData = tupleData.map(tup=>{
          Map(tup._1->tup._2)
        })
    
        val text = env.fromElements("zs","ls","ww")
    
        val result = text.map(new RichMapFunction[String,String] {
    
          var listData: java.util.List[Map[String,Int]] = null
          var allMap  = Map[String,Int]()
    
          override def open(parameters: Configuration): Unit = {
            super.open(parameters)
            this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
            val it = listData.iterator()
            while (it.hasNext){
              val next = it.next()
              allMap = allMap.++(next)
            }
          }
    
          override def map(value: String) = {
            val age = allMap.get(value).get
            value+","+age
          }
        }).withBroadcastSet(toBroadcastData,"broadcastMapName")
    
        result.print()
      }
    }

    1、设置广播变量
      在某个需要用到该广播变量的算子后调用withBroadcastSet(var1, var2)进行设置,var1为需要广播变量的变量名,var2是自定义变量名,为String类型。注意,被广播的变量只能为DataSet类型,不能为List、Int、String等类型。
    2、

    获取广播变量
    创建该算子对应的富函数类,例如map函数的富函数类是RichMapFunction,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个Traversable[_]接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写open函数,通过getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量,var即为设置广播变量时的自定义变量名,类型为String,open函数在算子生命周期的初始化阶段便会调用;最后在map方法中对获取到的广播变量进行访问及其它操作。

    参考:

    https://blog.csdn.net/fct2001140269/article/details/84402798

    https://blog.csdn.net/qq_34842671/article/details/80746593

  • 相关阅读:
    inf的设置【知识】
    输入加速【模板】
    floyed算法【最短路】【模板】
    vector的erase函数使用
    欧拉图
    组合索引
    索引的存储
    索引失效
    装饰器和代理模式
    单例模式
  • 原文地址:https://www.cnblogs.com/linkmust/p/10901731.html
Copyright © 2011-2022 走看看