zoukankan      html  css  js  c++  java
  • 初识Flink广播变量broadcast

      Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration
    import scala.collection.mutable.ListBuffer
    
    object BatchDemoBroadcastScala {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        //1: 准备需要广播的数据
        val broadData = ListBuffer[Tuple2[String,Int]]()
        broadData.append(("zs",18))
        broadData.append(("ls",20))
        broadData.append(("ww",17))
        //1.1处理需要广播的数据
        val tupleData = env.fromCollection(broadData)
        val toBroadcastData = tupleData.map(tup=>{
          Map(tup._1->tup._2)
        })
    
        val text = env.fromElements("zs","ls","ww")
    
        val result = text.map(new RichMapFunction[String,String] {
    
          var listData: java.util.List[Map[String,Int]] = null
          var allMap  = Map[String,Int]()
    
          override def open(parameters: Configuration): Unit = {
            super.open(parameters)
            this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
            val it = listData.iterator()
            while (it.hasNext){
              val next = it.next()
              allMap = allMap.++(next)
            }
          }
    
          override def map(value: String) = {
            val age = allMap.get(value).get
            value+","+age
          }
        }).withBroadcastSet(toBroadcastData,"broadcastMapName")
    
        result.print()
      }
    }

    1、设置广播变量
      在某个需要用到该广播变量的算子后调用withBroadcastSet(var1, var2)进行设置,var1为需要广播变量的变量名,var2是自定义变量名,为String类型。注意,被广播的变量只能为DataSet类型,不能为List、Int、String等类型。
    2、

    获取广播变量
    创建该算子对应的富函数类,例如map函数的富函数类是RichMapFunction,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个Traversable[_]接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写open函数,通过getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量,var即为设置广播变量时的自定义变量名,类型为String,open函数在算子生命周期的初始化阶段便会调用;最后在map方法中对获取到的广播变量进行访问及其它操作。

    参考:

    https://blog.csdn.net/fct2001140269/article/details/84402798

    https://blog.csdn.net/qq_34842671/article/details/80746593

  • 相关阅读:
    如何将网格式报表打印成其它样式
    拥有与实力不相称的脾气是种灾难——北漂18年(23)
    8.8.1 Optimizing Queries with EXPLAIN
    mysql 没有rowid 怎么实现根据rowid回表呢?
    secondary index
    8.5.5 Bulk Data Loading for InnoDB Tables 批量数据加载
    mysql 中key 指的是索引
    8.5.4 Optimizing InnoDB Redo Logging 优化InnoDB Redo 日志
    8.5.3 Optimizing InnoDB Read-Only Transactions 优化InnoDB 只读事务
    8.5.1 Optimizing Storage Layout for InnoDB Tables InnoDB表的存储布局优化
  • 原文地址:https://www.cnblogs.com/linkmust/p/10901731.html
Copyright © 2011-2022 走看看