RDD创建
每个RDD被分为多个分区,这些分区运行在集群的不同节点上。
用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里driver分发驱动器程序中的对象集合(list和set)。textFile()和parallelize().
RDD支持两种类型的操作:转换操作和行动操作。惰性计算,只有行动操作才真正计算。
RDD持久化,RDD.persist()让Spark把这个RDD缓存在内存中,在之后的行动操作中,可以重用这些数据。也可以把RDD缓存到磁盘中。
两种读取数据的方式
lines = sc.parallelize(["asda","asdsa"])//创建RDD的parallelize方法,python
JavaRDD<String> lines = sc.parallelize(Arrays.asList("",""));//Java的parallelize方法
更常用的是从外部存储中读取数据创建RDD。
lines = sc.textFile("")//python
val lines = sc.textFile("")//scala
JavaRDD<String> lines = sc.textFile("ssa");//Java
RDD操作
//python
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lamdda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)//两个RDD合并操作
Spark会使用lineage graph(DAG)来表示RDD之间的依赖关系。
用take()来收集RDD的元素,提取badLinesRDD的10个元素。
for line in badLinesRDD.take(10):
print line
collect()函数获取整个RDD的数据
把数据写入HDFS和Amazon S3,可以使用saveAsTextFile()、saveAsSequenceFile()
转换操作传入函数
在Python中,有三种方式把函数传递给Spark:
- lambda表达式,大多使用lambda
- 顶层函数
- 定义的局部函数
word = rdd.filter(lambda x: "error" in x)//lambda
def containsError(s):
return "error" in s
word = rdd.filter(containError)//局部函数
在Scala中,可以把定义的内联函数、方法的引用或静态方法传递给Spark。传递的函数及其引用的数据需要是可序列化的。
在Java中,传递org.apache.spark.api.java.function中的函数接口对象。
转换函数
filter() map() flatMap() groupByKey() reduceByKey()
map(),接收一个函数,把这个函数用于RDD中的每个元素,把函数的返回结果作为RDD元素对应的值。返回值的类型和输入值的类型不需要一样。
filter(),接收一个函数,把RDD中满足该函数的元素放入新的RDD中返回。
flatMap(),返回一个包含各个迭代器可访问的所有元素的RDD,相当于执行map()然后flat拍扁。(map返回一行一行,flatMap返回一个词一个词)
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()//返回hello
- groupByKey(),根据相同的键,分组,返回[key,iterable(value)]
- reduceByKey(func),相同的键,把value加起来,返回[key,value],函数func,对v操作,相加或相乘等
- RDD.distinct()生成一个只包含不同元素的新RDD,开销很大,需要shuffle。
- union()两个RDD合并成一个;intersection()只返回两个RDD都有的元素。
- subtract(other)返回只存在第一个RDD不存在第二个RDD中所有元素组成的RDD,也要shuffle。
如果缓存数据太多,内存放不下,Spark会用LRU策略把最老的分区从内存中移除。写入磁盘。
行动操作
count() collect() first() take(n) reduce(func) foreach(func)等,遇到行动操作要开始计算。
- count(),返回数据集中元素个数
- reduce(func),接收一个函数作为参数,操作RDD的元素数据并返回相同类型的结果元素
rdd=[1,2,3,4,5]
sum = rdd.reduce(lambda x, y : x + y)
//sum = 15
- collect(),以数组的形式返回数据集的所哟u元素,即把RDD内容返回
- first(),返回第一个元素
- take(n)返回RDD的n个元素
- foreach(func)对RDD中的每个元素进行函数func操作
持久化
避免多次计算同一个RDD,可以对数据进行持久化,计算出RDD的节点会分别保存它们分区数据,在scala和Java中,persist()默认把数据缓存在JVM堆空间。unpersist()持久化RDD从缓存中移除。
- persist()不会马上持久化,遇到行动操作了,才会持久化。持久化后的RDD会被保留在计算节点的内存中,被后面的行动操作反复使用。
- 策略为LRU。
rdd.cache()//会调用persist(MEMORY_ONLY)