zoukankan      html  css  js  c++  java
  • spark sql cache

    1.几种缓存数据的方法

    例如有一张hive表叫做activity

    1.CACHE TABLE

    //缓存全表
    sqlContext.sql("CACHE TABLE activity")
    
    //缓存过滤结果
    sqlContext.sql("CACHE TABLE activity_cached as select * from activity where ...")
    

    CACHE TABLE是即时生效(eager)的,如果你想等到一个action操作再缓存数据可以使用CACHE LAZY TABLE,这样操作会直到一个action操作才被触发,例如count(*)

    sqlContext.sql("CACHE LAZY TABLE ...")
    

    取消hive表缓存数据

    sqlContext.sql("UNCACHE TABLE activity")
    

    2.将dataFrame注册成表并缓存

    val df = sqlContext.sql("select * from activity")
    df.registerTempTable("activity_cached")
    sqlContext.cacheTable("activity_cached")
    
    Tip:cacheTable操作是lazy的,需要一个action操作来触发缓存操作。
    

    对应的uncacheTable可以取消缓存

    sqlContext.uncacheTable("activity_cached")
    

    3.缓存dataFrame

    val df = sqlContext.sql("select * from tableName")
    df.cache()
    

    2.缓存结果

    缓存时看到如下提示:

    Added rdd_xx_x in memory on ...
    

    如果内存不足,则会存入磁盘中,提示如下:

    Added rdd_xx_x on disk on ...
    

    缓存数据后可以在Storage上看到缓存的数据

    cache

    3.一些参数

    spark.sql.autoBroadcastJoinThreshold
    

    该参数默认为10M,在进行join等聚合操作时,将小于该值的表broadcast到每台worker,消除了大量的shuffle操作。

    spark.rdd.compress true
    

    将rdd存入mem或disk前再进行一次压缩,效果显著,我使用cacheTable了一张表,没有开启该参数前总共cache了54G数据,开启这个参数后只34G,可是执行速度并没有收到太大的影响。

    spark.sql.shuffle.partitions
    

    这个参数默认为200,是join等聚合操作的并行度,如果有大量的数据进行操作,造成单个任务比较重,运行时间过长的时候,会报如下的错误:

    org.apache.spark.shuffle.FetchFailedException: Connection from /192.168.xx.xxx:53450 closed
    

    这个时候需要提高该值。

  • 相关阅读:
    安装系统时碰上hal.dll文件丢失或损坏问题
    系统无法停止USB接口的“通用卷”的解决方法
    提取嵌入excel或word中flash的VBA代码
    打印机的PCL驱程和PS驱程的区别!
    spoolsv.exe占用资源的解决方法
    检测硬盘与内存中的隐藏病毒
    关于Windows默认共享的一些认识
    SmartClient + WebServices 开发 1
    常用 ajax 框架比较摘自网络
    AJAX 框架 Prototype
  • 原文地址:https://www.cnblogs.com/xiaomaohai/p/6158039.html
Copyright © 2011-2022 走看看