zoukankan      html  css  js  c++  java
  • spark07

    spark07

    spark运行原理:

    RDD Object

    driver提交代码,spark-submit运行main方法。但是没有真正执行,初始化driver端得组件DAGScheduler  TaskScheduler  SchedulerBackEnd.saveAsTextFile出发,DAGScheduler提交任务,解析代码生DAG有向无环图,真正得遇见action算子。

    DAGScheduler进行DAG图的切分stage,从最后一个rdd入手,先前推进,看看dependency得依赖关系,如果是窄依赖则将本阶段rdd得数量加一,如果是shuffle算子那么将阶段加一,得到所有得阶段后,匹配阶段得类型,如果是shuffleMapStage就创建一个TaskSet装入得都是ShuffleMapTask,这个阶段得最后一个rdd得分区数量就是Task任务得数量,这个阶段完毕后,获得得时候一个Seq[TaskSet],将这个任务提交给TaskScheduler

    TaskScheduler是接口,和executor进行交互,心跳机制,其实TaskScheduler就是driver

    Executor通信得组件,找到SchedulerBackEnd这个组件通信

    CoarseGrainedExecutorBackEnd   local  实现类进行任务得提交通信,将每一个task任务进行序列化,executor拿到任务以后要用到一个线程池进行运行,这个线程池没有能力运行一个普通的类,将task任务进行包装,组成一个taskRunner,放入到executor中进行执行。

    调用taskRunner中的run方法执行代码逻辑,每个task中的runTask方法执行逻辑

    spark的高级特性(持久化)

    rdd1-rdd2-rdd3 ..... rdd20 ----消费记录分析  /  快递情况分析

    持久化(缓存)他的意义在于频繁使用的rdd进行存储,不需要再次重新计算

     


    缓存数据,cache()方法 == persist,将rdd缓存起来,下次使用的时候可以直接从缓存中读取数据

    rdd.cache = rdd.persist() == rdd.persist(Memory_Only)

    rdd.persist(StorageLevel.xxxx)

    默认提供了12种存储级别

    通过构造器和构造器种的值不一样,产生了12种级别

     

    userDisk?是不是使用磁盘

    userMemory?是不是使用内存

    userOffHeap?是不是使用堆外内存

    deserialized?是不是不序列化

    replication?缓存rdd的时候缓存几个备份数

    使用方式rdd.persist  rdd.cache()  rdd.persisit(StorageLevel.xxxx)但是不能自定义存储方式,因为构造器是私有化的不能重载

     

     

    rdd.cache是一个懒加载形式,不遇见action算子不会执行,缓存的数据在监控页面种可以查看,能够分别查看每个分区数据的大小和存储的位置

    缓存的数据分别存储到不同的executor种,每个executor种存在一个blockManager的工具管理本地缓存数据

     

    缓存种的数据量远远大于实际数据量?

    因为是rdd的数据

     

    缓存的rdd变成了绿色

     

    数据流向不会发生任何改变,读取数据直接从缓存种读取,不是从dhfs种读取,前面的过程直接跳过,但是流程还要走一遍

    去除缓存的方式

    rdd.unpersit

    如果当前应用停止,则executor进程消失,那么缓存的数据也消失

    总结:

    1. 常用的算子cache() persist() 也就是仅仅使用内存的方式比较常用
    2. 持久化的算子是懒加载的,必须使用行动类算子进行执行才能缓存
    3. 持久化算子在使用的时候依赖关系不会发生任何改变,但是计算过的内容不会重新计算
    4. 如果使用仅内存的方式,可能只缓存一部分的数据

     

    以上的缓存只是针对于一个应用的,如果 应用停止,则缓存数据消失


    checkPoint持久化数据(跨应用的)

     

    缓存数据的时候是将数据放入到hdfs中,所以必须设定存储的文件目录

     

    存储的时候有几个分区就会存储几份数据,这些数据的目录

    /rddCkpt/54c48611-7771-45d4-9ad0-f3ad427c6858/rdd-6

    这个目录的路径和应用的id还有rdd有关,是绑定关系

     

    checkpoint缓存的算子上继续执行任务的时候,这个依赖关系发生了改变,任务的开头从checkpoint开始。

     

    rdd.checkpoint

    rdd.collect

    在使用checkpoint进行rdd缓存的时候,一个action算子产生了两个任务

     

    在使用checkpoint的时候,是将数据存储到hdfs中,collect算子在执行的时候,自己会生成一个job。还会产生一个job专门来处理checkpoint将数据放入到hdfs

    1. 如果使用checkpoint就可以跨应用使用了
    2. 如果执行checkpoint会产生两个应用

    scala> sc.textFile("hdfs://master:9000/aaa.txt")

    res23: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/aaa.txt MapPartitionsRDD[20] at textFile at <console>:25

    scala> res23.cache()

    res24: res23.type = hdfs://master:9000/aaa.txt MapPartitionsRDD[20] at textFile at <console>:25

    scala> res23.checkpoint

    scala> res23.collect

    res26: Array[String] = Array(hello tom, hello jerry, hello rose, hello jack, jack rose)

    在做checkpoint的时候可以先将这个rdd进行cache缓存,在执行checkpoint的时候可以从缓存中读取数据

    以上的缓存都要使用内存,这个内存是spark集群中的内存

    spark集群中的内存管理方案

    object IpTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("ip")
        val sc = new SparkContext(conf)
        val iprdd = sc.textFile("ip.txt")
        val accessrdd = sc.textFile("access.log")
        val iprdd1:RDD[(Long,Long,String)] = iprdd.map(t=>{
          val strs = t.split("\|")
          (strs(2).toLong,strs(3).toLong,strs(6))
          //lowIp  upIp  province
        })
        val accRDD:RDD[Long] = accessrdd.map(t=>{
          val strIp = t.split("\|")(1)
          //StringIp
          ip2Long(strIp)
        })

        val iparr:Array[(Long,Long,String)] = iprdd1.collect()
        //因为嵌套是产生了笛卡儿积,所以不能使用嵌套关系
        //取一条数据去整个规则库中查询出来归属地
        //二分法

        val proRDD = accRDD.map(t=>{
          val province = binarySeach(t,iparr)
          (province,1)
        })

        val result = proRDD.reduceByKey(_+_)

        result.foreach(println)

        result.foreach(t=>{
          val connection  = DriverManager.getConnection("jdbc:mysql://master:3306/spark?characterEncoding=utf-8","root","123456")
          val pap = connection.prepareStatement("create table if not exists visit_log(id int auto_increment,name varchar(20),count int,primary key(id)) charset=utf8")
          pap.execute()
          pap.close()
          val statement = connection.prepareStatement("insert into visit_log(name,count)values(?,?)")
          statement.setString(1,t._1)
          statement.setInt(2,t._2)
          statement.execute()
          statement.close()
          connection.close()
        })

    //    val result = accRDD.map(t=>{
    //      val rdd =  iprdd1.filter(e=>{
    //         e._1<= t && t<=e._2
    //       })
    //      rdd.collect()(0)
    //    })
    //    result.foreach(println)



      }

      def binarySeach(ip:Long,ipArr:Array[(Long,Long,String)]):String={
        var start = 0
        var end = ipArr.length-1
        while(start<=end){
           val middel = (start+end)/2
           if(ipArr(middel)._1<=ip && ip<=ipArr(middel)._2){
              return ipArr(middel)._3
           }
          if(ipArr(middel)._1 > ip){
            end = middel-1
          }
          if(ipArr(middel)._2<ip){
            start = middel+1
          }
        }
        "unknow"
      }

      def ip2Long(ip: String): Long = {
        val fragments = ip.split("[.]")
        var ipNum = 0L
        for (i <- 0 until fragments.length){
          ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
      }
    }

  • 相关阅读:
    计算属性(computed)、方法(methods)和侦听属性(watch)的区别与使用场景
    DatagramChannel
    IIS发布问题集锦
    ObjectStateManager 中已存在具有同一键的对象。ObjectStateManager 无法跟踪具有相同键的多个对象
    MVC教程--MiniProfiler.EF监控调试MVC和EF的性能
    关于EF输出sql的执行日志
    Entity Framework扩展库
    C#中特性,以及应用场景(收藏链接)
    ajax原理
    .net中session无效
  • 原文地址:https://www.cnblogs.com/JBLi/p/11528386.html
Copyright © 2011-2022 走看看