zoukankan      html  css  js  c++  java
  • Spark中持久化和序列化学习

    一、cache和persisit的对比

    -rw-r--r--@ 1 hadoop staff 68M 5 17 07:04 access.log

    屏幕快照 2019-05-17 07.12.29

    屏幕快照 2019-05-17 07.18.02

    屏幕快照 2019-05-17 07.35.58

    cache/persitence是 lazy的,延迟加载 unpersitence是立即执行的

    @DeveloperApi
    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1)
      extends Externalizable {  }
      
      /**
     * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
     * new storage levels.
     */
    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    
      /**
       * Persist this RDD with the default storage level (`MEMORY_ONLY`).
       */
      def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
    
      /**
       * Persist this RDD with the default storage level (`MEMORY_ONLY`).
       */
      def cache(): this.type = persist()
    
      /**
       * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
       *
       * @param blocking Whether to block until all blocks are deleted.
       * @return This RDD.
       */
      def unpersist(blocking: Boolean = true): this.type = {
        logInfo("Removing RDD " + id + " from persistence list")
        sc.unpersistRDD(id, blocking)
        storageLevel = StorageLevel.NONE
        this
      }
    
      /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
      def getStorageLevel: StorageLevel = storageLevel
    

    二、序列化测试Java和kyro

    序列化: 默认java序列化类User 使用kyro序列化没有未注册类User 使用kryo序列化并注册类User



    默认java序列化类User

    import scala.collection.mutable.ListBuffer
    class User(id:Int,username:String,age:String) extends Serializable
    val users = new ListBuffer[User]
    for(i <- 1 to 1000000){
    users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users)
    import org.apache.spark.storage.StorageLevel
    usersRDD.persist(StorageLevel.MEMORY_ONLY_SER)
    usersRDD.foreach(println(_))
    

    使用kyro序列化没有未注册类User

    import org.apache.spark.SparkConf
    val sparkConf= new SparkConf()
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    import org.apache.spark.SparkContext
    
    
    

    使用kryo序列化并注册类User

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        sparkConf.registerKryoClasses(Array(classOf[User]))
    
  • 相关阅读:
    UVALive2678子序列
    UVA11549计算机谜题
    UVA11520填充正方形
    LA3635派
    UVALive3971组装电脑
    记录未完成题目
    SPOJ 6219 Edit distance字符串间编辑距离
    ACM组队安排-——杭电校赛(递推)
    逆袭指数-——杭电校赛(dfs)
    油菜花王国——杭电校赛(并查集)
  • 原文地址:https://www.cnblogs.com/suixingc/p/spark-zhong-chi-jiu-hua-he-xu-lie-hua-xue-xi.html
Copyright © 2011-2022 走看看