zoukankan      html  css  js  c++  java
  • Flink 的广播变量

    Flink 支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中,这样可以减缓大量的 shuffle 操作;

    比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存中,可以直接在内存中拿数据,避免了大量的 shuffle,导致集群性能下降;

    广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节

    点获取到的值都是一致的。

    一句话解释,可以理解为是一个公共的共享变量,我们可以把一个 dataset数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节

    点上只会存在一份。如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset 数据)。

    注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现 OOM 这样的问题

    • Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
    • Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量

       

    操作步骤

    1:初始化数据

    DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)

    2:广播数据

    .withBroadcastSet(toBroadcast, "broadcastSetName");

    3:获取数据

    Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

       

      

    package com.starzy

       

    import org.apache.flink.api.common.functions.RichMapFunction

    import org.apache.flink.api.scala.ExecutionEnvironment

    import org.apache.flink.configuration.Configuration

    import org.apache.flink.api.scala._

    import scala.collection.mutable

    import scala.collection.mutable.ArrayBuffer

    import scala.util.Random

       

    object BrodCast {

    def main(args: Array[String]): Unit = {

    val env: ExecutionEnvironment = ExecutionEnvironment. getExecutionEnvironment

       

    //TODO data2 join data3 的数据,使用广播变量完成 的数据,使用广播变量完成

    val data2 = new mutable.MutableList[(Int, Long, String)]

    data2.+=((1, 1L, "Hi"))

    data2.+=((2, 2L, "Hello"))

    data2.+=((3, 2L, "Hello world"))

    val ds1 = env.fromCollection(Random.shuffle(data2))

    val data3 = new mutable.MutableList[(Int, Long, Int, String, Long)]

    data3.+=((1, 1L, 0, "Hallo", 1L))

    data3.+=((2, 2L, 1, "Hallo Welt", 2L))

    data3.+=((2, 3L, 2, "Hallo Welt wie", 1L))

    val ds2 = env.fromCollection(Random.shuffle(data3))

       

       

    //todo 使用内部类 RichMapFunction ,提供 open map ,可以完成 join 的操作 的操作

    val result = ds1.map(new RichMapFunction[(Int , Long , String) , ArrayBuffer[(Int , Long , String , String)]] {

       

    var brodCast :mutable.Buffer[(Int, Long, Int, String, Long)] = null

    override def open(parameters: Configuration): Unit = {

    import scala.collection.JavaConverters._

    //asScala 需要使用隐式转换

    brodCast = this.getRuntimeContext.getBroadcastVariable[(Int, Long, Int, String, Long)]("ds2").asScala

    }

    override def map(value: (Int, Long, String)):ArrayBuffer[(Int , Long , String , String)] = {

    val toArray: Array[(Int, Long, Int, String, Long)] = brodCast .toArray

    val array = new mutable.ArrayBuffer[(Int , Long , String , String)]

    var index = 0

    var a:(Int, Long, String, String) = null

    while(index < toArray.size){

    if(value._2 == toArray(index)._5){

    a = (value._1 , value._2 , value._3 , toArray(index)._4)

    array += a

    }

    index = index + 1

    }

    array

    }

    }).withBroadcastSet(ds2 , "ds2")

    println (result.collect())

    }

    }

       

       

       

       

       

  • 相关阅读:
    shell编程基础(六): 透彻解析查找命令find
    shell编程基础(五): 正则表达式及其使用
    shell编程基础(三): 位置参数与shell脚本的输入输出
    shell编程基础(二): shell脚本语法之分支语句和循环语句
    shell编程基础(一): 基本变量和基本符号
    SpringCloud学习(一):微服务简介
    ubuntu 安装bazel
    numpy reshape resize用法
    L0、L1及L2范数
    linux常用的搜索命令
  • 原文地址:https://www.cnblogs.com/starzy/p/10601468.html
Copyright © 2011-2022 走看看