zoukankan      html  css  js  c++  java
  • spark07

    spark07

    spark运行原理:

    RDD Object

    driver提交代码,spark-submit运行main方法。但是没有真正执行,初始化driver端得组件DAGScheduler  TaskScheduler  SchedulerBackEnd.saveAsTextFile出发,DAGScheduler提交任务,解析代码生DAG有向无环图,真正得遇见action算子。

    DAGScheduler进行DAG图的切分stage,从最后一个rdd入手,先前推进,看看dependency得依赖关系,如果是窄依赖则将本阶段rdd得数量加一,如果是shuffle算子那么将阶段加一,得到所有得阶段后,匹配阶段得类型,如果是shuffleMapStage就创建一个TaskSet装入得都是ShuffleMapTask,这个阶段得最后一个rdd得分区数量就是Task任务得数量,这个阶段完毕后,获得得时候一个Seq[TaskSet],将这个任务提交给TaskScheduler

    TaskScheduler是接口,和executor进行交互,心跳机制,其实TaskScheduler就是driver

    Executor通信得组件,找到SchedulerBackEnd这个组件通信

    CoarseGrainedExecutorBackEnd   local  实现类进行任务得提交通信,将每一个task任务进行序列化,executor拿到任务以后要用到一个线程池进行运行,这个线程池没有能力运行一个普通的类,将task任务进行包装,组成一个taskRunner,放入到executor中进行执行。

    调用taskRunner中的run方法执行代码逻辑,每个task中的runTask方法执行逻辑

    spark的高级特性(持久化)

    rdd1-rdd2-rdd3 ..... rdd20 ----消费记录分析  /  快递情况分析

    持久化(缓存)他的意义在于频繁使用的rdd进行存储,不需要再次重新计算

     


    缓存数据,cache()方法 == persist,将rdd缓存起来,下次使用的时候可以直接从缓存中读取数据

    rdd.cache = rdd.persist() == rdd.persist(Memory_Only)

    rdd.persist(StorageLevel.xxxx)

    默认提供了12种存储级别

    通过构造器和构造器种的值不一样,产生了12种级别

     

    userDisk?是不是使用磁盘

    userMemory?是不是使用内存

    userOffHeap?是不是使用堆外内存

    deserialized?是不是不序列化

    replication?缓存rdd的时候缓存几个备份数

    使用方式rdd.persist  rdd.cache()  rdd.persisit(StorageLevel.xxxx)但是不能自定义存储方式,因为构造器是私有化的不能重载

     

     

    rdd.cache是一个懒加载形式,不遇见action算子不会执行,缓存的数据在监控页面种可以查看,能够分别查看每个分区数据的大小和存储的位置

    缓存的数据分别存储到不同的executor种,每个executor种存在一个blockManager的工具管理本地缓存数据

     

    缓存种的数据量远远大于实际数据量?

    因为是rdd的数据

     

    缓存的rdd变成了绿色

     

    数据流向不会发生任何改变,读取数据直接从缓存种读取,不是从dhfs种读取,前面的过程直接跳过,但是流程还要走一遍

    去除缓存的方式

    rdd.unpersit

    如果当前应用停止,则executor进程消失,那么缓存的数据也消失

    总结:

    1. 常用的算子cache() persist() 也就是仅仅使用内存的方式比较常用
    2. 持久化的算子是懒加载的,必须使用行动类算子进行执行才能缓存
    3. 持久化算子在使用的时候依赖关系不会发生任何改变,但是计算过的内容不会重新计算
    4. 如果使用仅内存的方式,可能只缓存一部分的数据

     

    以上的缓存只是针对于一个应用的,如果 应用停止,则缓存数据消失


    checkPoint持久化数据(跨应用的)

     

    缓存数据的时候是将数据放入到hdfs中,所以必须设定存储的文件目录

     

    存储的时候有几个分区就会存储几份数据,这些数据的目录

    /rddCkpt/54c48611-7771-45d4-9ad0-f3ad427c6858/rdd-6

    这个目录的路径和应用的id还有rdd有关,是绑定关系

     

    checkpoint缓存的算子上继续执行任务的时候,这个依赖关系发生了改变,任务的开头从checkpoint开始。

     

    rdd.checkpoint

    rdd.collect

    在使用checkpoint进行rdd缓存的时候,一个action算子产生了两个任务

     

    在使用checkpoint的时候,是将数据存储到hdfs中,collect算子在执行的时候,自己会生成一个job。还会产生一个job专门来处理checkpoint将数据放入到hdfs

    1. 如果使用checkpoint就可以跨应用使用了
    2. 如果执行checkpoint会产生两个应用

    scala> sc.textFile("hdfs://master:9000/aaa.txt")

    res23: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/aaa.txt MapPartitionsRDD[20] at textFile at <console>:25

    scala> res23.cache()

    res24: res23.type = hdfs://master:9000/aaa.txt MapPartitionsRDD[20] at textFile at <console>:25

    scala> res23.checkpoint

    scala> res23.collect

    res26: Array[String] = Array(hello tom, hello jerry, hello rose, hello jack, jack rose)

    在做checkpoint的时候可以先将这个rdd进行cache缓存,在执行checkpoint的时候可以从缓存中读取数据

    以上的缓存都要使用内存,这个内存是spark集群中的内存

    spark集群中的内存管理方案

    object IpTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("ip")
        val sc = new SparkContext(conf)
        val iprdd = sc.textFile("ip.txt")
        val accessrdd = sc.textFile("access.log")
        val iprdd1:RDD[(Long,Long,String)] = iprdd.map(t=>{
          val strs = t.split("\|")
          (strs(2).toLong,strs(3).toLong,strs(6))
          //lowIp  upIp  province
        })
        val accRDD:RDD[Long] = accessrdd.map(t=>{
          val strIp = t.split("\|")(1)
          //StringIp
          ip2Long(strIp)
        })

        val iparr:Array[(Long,Long,String)] = iprdd1.collect()
        //因为嵌套是产生了笛卡儿积,所以不能使用嵌套关系
        //取一条数据去整个规则库中查询出来归属地
        //二分法

        val proRDD = accRDD.map(t=>{
          val province = binarySeach(t,iparr)
          (province,1)
        })

        val result = proRDD.reduceByKey(_+_)

        result.foreach(println)

        result.foreach(t=>{
          val connection  = DriverManager.getConnection("jdbc:mysql://master:3306/spark?characterEncoding=utf-8","root","123456")
          val pap = connection.prepareStatement("create table if not exists visit_log(id int auto_increment,name varchar(20),count int,primary key(id)) charset=utf8")
          pap.execute()
          pap.close()
          val statement = connection.prepareStatement("insert into visit_log(name,count)values(?,?)")
          statement.setString(1,t._1)
          statement.setInt(2,t._2)
          statement.execute()
          statement.close()
          connection.close()
        })

    //    val result = accRDD.map(t=>{
    //      val rdd =  iprdd1.filter(e=>{
    //         e._1<= t && t<=e._2
    //       })
    //      rdd.collect()(0)
    //    })
    //    result.foreach(println)



      }

      def binarySeach(ip:Long,ipArr:Array[(Long,Long,String)]):String={
        var start = 0
        var end = ipArr.length-1
        while(start<=end){
           val middel = (start+end)/2
           if(ipArr(middel)._1<=ip && ip<=ipArr(middel)._2){
              return ipArr(middel)._3
           }
          if(ipArr(middel)._1 > ip){
            end = middel-1
          }
          if(ipArr(middel)._2<ip){
            start = middel+1
          }
        }
        "unknow"
      }

      def ip2Long(ip: String): Long = {
        val fragments = ip.split("[.]")
        var ipNum = 0L
        for (i <- 0 until fragments.length){
          ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
      }
    }

  • 相关阅读:
    洛谷 P1194 飞扬的小鸟 题解
    洛谷 P1197 星球大战 题解
    洛谷 P1879 玉米田Corn Fields 题解
    洛谷 P2796 Facer的程序 题解
    洛谷 P2398 GCD SUM 题解
    洛谷 P2051 中国象棋 题解
    洛谷 P1472 奶牛家谱 Cow Pedigrees 题解
    洛谷 P1004 方格取数 题解
    洛谷 P2331 最大子矩阵 题解
    洛谷 P1073 最优贸易 题解
  • 原文地址:https://www.cnblogs.com/JBLi/p/11528386.html
Copyright © 2011-2022 走看看