Spark RDD 是惰性求值的。
如果简单地对RDD 调用行动操作,Spark 每次都会重算RDD 以及它的所有依赖。这在迭代算法中消耗格外大。
换句话来说就是
当DAG图遇到转化操作的时候是不求值的。
只有当遇到行动操作的时候才会求值,但是每次求值都是从头到尾来求的。
并不会从上一个行动操作为起点来计算。这样一来迭代计算就会重复计算好多数值。
我们可以通过持久化(缓存)机制避免这种重复计算的开销。
使用persist()方法对一个RDD标记为持久化。
之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算成RDD并把它持久化。
而是要等到遇到下一个行动操作真正发生产生计算的时候,才会把计算结果持久化。
持久化后的RDD将被保留在计算节点的内存中,可以被后面的行动操作重复使用。
import org.apache.spark._ object MyRdd { def main(args:Array[String]): Unit ={ //初始化配置:设置主机名和程序主类的名字 val conf = new SparkConf().setMaster("local").setAppName("MyRdd"); //通过conf来创建sparkcontext val sc = new SparkContext(conf); val list = List("Hadoop","Spark","Hive"); val rdd = sc.parallelize(list); rdd.persist();//调用了持久化进行标记,但是这里并不会缓存rdd,因为rdd还没有计算生成 println(rdd.count());//标记后第一次的行动操作count()产生计算值,才会执行上边的rdd.persist() println(rdd.collect());//标记后的第二次行动操作,不需要从头开始重新计算,只需要使用缓存中的rdd } }
注意persist()方法是有持久化级别参数的。
1.persist(MEMORY_ONLY):表示将RDD作为反序列化对象存储于JVM中,如果内存不足,就按照LRU原则替换内存中的内容。
2.persist(MEMORY_AND_DISK):表示将RDD作为反序列化的对象存储在JVM中,超出的分区将存放在磁盘中。
我们日常中经常使用cache()方法,这个方法会调用persist(MEMORY_ONLY)。例如,rdd.cache()
我们可以通过unpersist()方法手动的把放在缓存中的持久化对象移除。