zoukankan      html  css  js  c++  java
  • spark sql cache

    1.几种缓存数据的方法

    例如有一张hive表叫做activity

    1.CACHE TABLE

    //缓存全表
    sqlContext.sql("CACHE TABLE activity")
    
    //缓存过滤结果
    sqlContext.sql("CACHE TABLE activity_cached as select * from activity where ...")
    

    CACHE TABLE是即时生效(eager)的,如果你想等到一个action操作再缓存数据可以使用CACHE LAZY TABLE,这样操作会直到一个action操作才被触发,例如count(*)

    sqlContext.sql("CACHE LAZY TABLE ...")
    

    取消hive表缓存数据

    sqlContext.sql("UNCACHE TABLE activity")
    

    2.将dataFrame注册成表并缓存

    val df = sqlContext.sql("select * from activity")
    df.registerTempTable("activity_cached")
    sqlContext.cacheTable("activity_cached")
    
    Tip:cacheTable操作是lazy的,需要一个action操作来触发缓存操作。
    

    对应的uncacheTable可以取消缓存

    sqlContext.uncacheTable("activity_cached")
    

    3.缓存dataFrame

    val df = sqlContext.sql("select * from tableName")
    df.cache()
    

    2.缓存结果

    缓存时看到如下提示:

    Added rdd_xx_x in memory on ...
    

    如果内存不足,则会存入磁盘中,提示如下:

    Added rdd_xx_x on disk on ...
    

    缓存数据后可以在Storage上看到缓存的数据

    cache

    3.一些参数

    spark.sql.autoBroadcastJoinThreshold
    

    该参数默认为10M,在进行join等聚合操作时,将小于该值的表broadcast到每台worker,消除了大量的shuffle操作。

    spark.rdd.compress true
    

    将rdd存入mem或disk前再进行一次压缩,效果显著,我使用cacheTable了一张表,没有开启该参数前总共cache了54G数据,开启这个参数后只34G,可是执行速度并没有收到太大的影响。

    spark.sql.shuffle.partitions
    

    这个参数默认为200,是join等聚合操作的并行度,如果有大量的数据进行操作,造成单个任务比较重,运行时间过长的时候,会报如下的错误:

    org.apache.spark.shuffle.FetchFailedException: Connection from /192.168.xx.xxx:53450 closed
    

    这个时候需要提高该值。

  • 相关阅读:
    ZOJ 1002 Fire Net
    Uva 12889 One-Two-Three
    URAL 1881 Long problem statement
    URAL 1880 Psych Up's Eigenvalues
    URAL 1877 Bicycle Codes
    URAL 1876 Centipede's Morning
    URAL 1873. GOV Chronicles
    Uva 839 Not so Mobile
    Uva 679 Dropping Balls
    An ac a day,keep wa away
  • 原文地址:https://www.cnblogs.com/zhangyunlin/p/6168165.html
Copyright © 2011-2022 走看看