zoukankan      html  css  js  c++  java
  • Flink的分布式缓存

    Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!

    缓存的使用流程:

    使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!

    【注意】广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上;

    package com.flink.DEMO.dataset
    
    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
    import org.apache.flink.configuration.Configuration
    
    import scala.collection.mutable.{ArrayBuffer, ListBuffer}
    import scala.io.Source
    import org.apache.flink.streaming.api.scala._
    /**
      * Created by angel;
      */
    object Distribute_cache {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        //1"开启分布式缓存
        val path = "hdfs://hadoop01:9000/score"
        env.registerCachedFile(path , "Distribute_cache")
    
        //2:加载本地数据
        val clazz:DataSet[Clazz] = env.fromElements(
            Clazz(1,"class_1"),
            Clazz(2,"class_1"),
            Clazz(3,"class_2"),
            Clazz(4,"class_2"),
            Clazz(5,"class_3"),
            Clazz(6,"class_3"),
            Clazz(7,"class_4"),
            Clazz(8,"class_1")
        )
    
        //3:开始进行关联操作
        clazz.map(new MyJoinmap()).print()
      }
    }
    class MyJoinmap() extends RichMapFunction[Clazz , ArrayBuffer[INFO]]{
      private var myLine = new ListBuffer[String]
      override def open(parameters: Configuration): Unit = {
        val file = getRuntimeContext.getDistributedCache.getFile("Distribute_cache")
        val lines: Iterator[String] = Source.fromFile(file.getAbsoluteFile).getLines()
        lines.foreach( line =>{
          myLine.append(line)
        })
      }
    
      //在map函数下进行关联操作
      override def map(value: Clazz):  ArrayBuffer[INFO] = {
        var stoNO = 0
        var subject = ""
        var score = 0.0
        var array = new collection.mutable.ArrayBuffer[INFO]()
        //(学生学号---学科---分数)
        for(str <- myLine){
          val tokens = str.split(",")
          stoNO = tokens(0).toInt
          subject = tokens(1)
          score = tokens(2).toDouble
          if(tokens.length == 3){
            if(stoNO == value.stu_no){
              array += INFO(value.stu_no , value.clazz_no , subject , score)
            }
          }
        }
        array
      }
    }
    //(学号 , 班级) join (学生学号---学科---分数) ==(学号 , 班级 , 学科 , 分数)
    
    
    case class INFO(stu_no:Int , clazz_no:String , subject:String , score:Double)
    case class Clazz(stu_no:Int , clazz_no:String)
  • 相关阅读:
    ytu 2030: 求实数绝对值(水题)
    [PHP] 链表数据结构(单链表)
    PHP将数据写入指定文件中
    PHP获取文件后缀名
    PHP数组序列化和反序列化
    PHP二维数组(或任意维数组)转换成一维数组的方法汇总
    PHP获取文件大小的方法详解
    PHP中嵌套函数被调用时出现报错的问题
    PHP递归排序
    PHP实现简单倒计时
  • 原文地址:https://www.cnblogs.com/niutao/p/10548489.html
Copyright © 2011-2022 走看看