zoukankan      html  css  js  c++  java
  • Spark缓存策略

    当对同一个rdd多次执行action时,如果在磁盘上则每次执行action都会从磁盘将数据加载,如果将其缓存到内存中会提高再次action的读取速度,Spark缓存主要有cache()和persist()两种,当缓存一个rdd时,每一个节点上都会存放这个rdd的partition,当要使用rdd的时候可以直接从内存读出。
    cache源码:
    def cache(self):
            """
            Persist this RDD with the default storage level (C{MEMORY_ONLY}).
            """
            self.is_cached = True
            self.persist(StorageLevel.MEMORY_ONLY)
            return self

    从源码可以看出,cache底层调用的是persist方法,传入的参数是:StorageLevel.MEMORY_ONLY,再看persist()方法:

    def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
            self.is_cached = True
            javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
            self._jrdd.persist(javaStorageLevel)
            return self

    persist方法,传入的参数是StorageLevel,从StorageLevel的源码可以看出它的值总共有6种,因此persist()相比cache()在缓存形式上更为丰富,不仅支持内存的方式,还支持内存和磁盘、内存副本等方式。

    StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
    StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
    StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
    StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
    StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
    StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
    StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)

    持久化到内存和直接从磁盘读取时间对比:

    import os
    import time
    from pyspark import SparkContext, SparkConf
    
    conf = SparkConf()
    sc = SparkContext(conf=conf)
    
    current_dir = os.path.dirname(os.path.realpath(__file__))
    file_path = "{}/name_age.txt".format(current_dir)
    
    
    def cached():
        start_time = time.time()
        text_rdd = sc.textFile("file://{}".format(file_path)).cache()
        text_rdd.count()
        text_rdd.count()
        end_time = time.time()
        print("{}:{}".format("first cache", end_time - start_time))
    
        start1_time = time.time()
        text1_rdd = sc.textFile("file://{}".format(file_path)).cache()
        text1_rdd.count()
        text1_rdd.count()
        end1_time = time.time()
        print("{}:{}".format("second cache", end1_time - start1_time))
    
    
    def uncached():
        start_time = time.time()
        text_rdd = sc.textFile("file://{}".format(file_path))
        text_rdd.count()
        text_rdd.count()
        end_time = time.time()
        print("{}:{}".format("first uncache", end_time - start_time))
    
        start1_time = time.time()
        text1_rdd = sc.textFile("file://{}".format(file_path))
        text1_rdd.count()
        text1_rdd.count()
        end1_time = time.time()
        print("{}:{}".format("second uncache", end1_time - start1_time))
    
    
    sc.stop()
    
    
    
    执行cached()结果:
    first cache:1.7104301452636719                                                  
    second cache:0.2717571258544922
    
    
    执行uncached()结果:
    first uncache:1.4453039169311523                                                
    second uncache:0.49161386489868164

    从执行结果可以看出,当第二次执行rdd.count()时,有cache情况下是0.2717571258544922;无cache情况下是0.49161386489868164,由于我的内存空间不足,所以不太明显,当数据量大且内存充足的时候,持久化到内存的效率会远远高于磁盘。

    对pyspark有兴趣的小伙伴可以关注我的github,spark for python 持续更新

  • 相关阅读:
    “持咒”到底是个什么东西?再论语言和思维关系
    传说中的噪声
    电源噪声(EMI)滤波器的基本原理与应用方法
    Like a rock,like the weather man【转】
    allegro笔记
    Taxicab geometry和Euclidean geometry
    hql和sql的区别
    SpringMVC与Struts2的区别
    ExtJS 学习专题如何应用ExtJS
    编程式事务和声明式事务
  • 原文地址:https://www.cnblogs.com/FG123/p/9748772.html
Copyright © 2011-2022 走看看