-
Spark 中的RDD 就是一个不可变的分布式对象集合。每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含Python、Java、Scala中任意类型的对象,甚至可以包含用户自定义的对象。
-
用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如list 和set)。
-
RDD支持两种类型的操作:转化操作和行动操作。转化操作会由一个RDD 生成一个新的RDD。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中。
-
转化操作和行动操作的区别在于Spark 计算RDD 的方式不同。
-
默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist()让Spark 把这个RDD 缓存下来。在第一次对持久化的RDD 计算之后,Spark 会把RDD 的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。
-
cache() 与使用默认存储级别调用persist() 是一样的。
总的来说,每个Spark 程序或shell 会话都按如下方式工作。
1. 从外部数据创建出输入RDD。
2. 使用诸如filter() 这样的转化操作对RDD 进行转化,以定义新的RDD。
3. 告诉Spark 对需要被重用的中间结果RDD 执行persist() 操作。
4. 使用行动操作(例如count()和first()等)来触发一次并行计算,Spark 会对计算进行优化后再执行。
创建RDD
Spark 提供了两种创建RDD的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。
创建RDD 最简单的方式就是把程序中一个已有的集合传给SparkContext
的parallelize()
方法。但除了开发原型和测试时,这种方式用得并不多,毕竟这种方式需要把你的整个数据集先放在一台机器的内存中。更常用的方式是从外部存储中读取数据来创建RDD。如: val lines = sc.textFile("/path/to/README.md")
RDD操作
RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一
个新的RDD 的操作,比如map()
和filter()
,而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count()
和first()
。它们的返回值类型:转化操作返回的是RDD,而行动操作返回的是其他的数据类型。
转化操作
RDD的转化操作是返回新RDD的操作。转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD 时才会被计算。许多转化操作都是针对各个元
素的,也就是说,这些转化操作每次只会操作RDD 中的一个元素。不过并不是所有的转
化操作都是这样的。
你从已有的RDD 中派生出新的RDD,Spark 会使用**谱系
图(lineage graph)来记录这些不同RDD 之间的依赖关系**。Spark 需要用这些信息来按需
计算每个RDD,也可以依靠谱系图在持久化的RDD 丢失部分数据时恢复所丢失的数据。
行动操作
行动操作是第二种类型的RDD 操作,它们会把最终求得的结
果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它
们会强制执行那些求值必须用到的RDD 的转化操作。
一些常用的行动操作函数:
- count()
:统计返回结果
- take()
:来收集RDD 中的一些元素
- collect()
: 用来获取整个RDD 中的数据。如果你的程序把RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用collect(),因此,collect() 不能用在大规模数据集上。
- saveAsTextFile()、saveAsSequenceFile()
:规模较大的RDD 不能通过collect()收集到驱动器进程中时,可以使用saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把RDD 的数据内容以各种自带的格式保存起来,写到诸如HDFS 或Amazon S3 这样的分布式的存储系统中。
注:每当我们调用一个新的行动操作时,整个RDD都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。
惰性求值
RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark 不会
开始计算。惰性求值意味着当我们对RDD 调用转化操作(例如调用map())时,操作不会立即执行。
相反,Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD 看作存
放着特定数据的数据集,而最好把每个**RDD 当作我们通过转化操作构建出来的、记录如
何计算数据的指令列表**。把数据读取到RDD 的操作也同样是惰性的。
Spark 使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。
向spark传递参数
python
在Python 中,我们有三种方式来把函数传递给Spark。传递比较短的函数时,可以使用
lambda 表达式来传递。除了lambda 表达式,我们也可以传递顶
层函数或是定义的局部函数。
word = rdd.filter(lambda s: "error" in s)
def containsError(s):
return "error" in s
word = rdd.filter(containsError)
- 1
- 2
- 3
- 4
- 5
传递函数时需要小心的一点是,Python 会在你不经意间把函数所在的对象也序列化传出
去。当你传递的对象是某个对象的成员,或者包含了对某个对象中一个字段的引用时(例
如self.field),Spark 就会把整个对象发到工作节点上,这可能比你想传递的东西大得多
(见例3-19)。有时,如果传递的类里面包含Python 不知道如何序列化传输的对象,也会
导致你的程序失败。
传递函数时需要小心的一点是,Python 会在你不经意间把函数所在的对象也序列化传出
去。当你传递的对象是某个对象的成员,或者包含了对某个对象中一个字段的引用时,Spark 就会把整个对象发到工作节点上,这可能比你想传递的东西大得多。
如传递一个带字段引用的函数(别这么做!)
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMatchesFunctionReference(self, rdd):
# 问题:在"self.isMatch"中引用了整个self
return rdd.filter(self.isMatch)
def getMatchesMemberReference(self, rdd):
# 问题:在"self.query"中引用了整个self
return rdd.filter(lambda x: self.query in x)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
替代的方案是,只把你所需要的字段从对象中拿出来放到一个局部变量中,然后传递这个
局部变量。
传递不带字段引用的Python 函数
class WordFunctions(object):
...
def getMatchesNoReference(self, rdd):
# 安全:只把需要的字段提取到局部变量中
query = self.query
return rdd.filter(lambda x: query in x)
- 1
- 2
- 3
- 4
- 5
- 6
scala
在Scala 中,我们可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像
Scala 的其他函数式API 一样。我们还要考虑其他一些细节,比如所传递的函数及其引用
的数据需要是可序列化的(实现了Java 的Serializable 接口)。除此以外,与Python 类似,
传递一个对象的方法或者字段时,会包含对整个对象的引用。这在Scala 中不是那么明显,
毕竟我们不会像Python 那样必须用self 写出那些引用。类似在例3-20 中对Python 执行
的操作,我们可以把需要的字段放到一个局部变量中,来避免传递包含该字段的整个对
象。
Scala 中的函数传递
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
RDD编程 | 29
// 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this"
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// 问题:"query"表示"this.query",因此我们要传递整个"this"
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// 安全:只把我们需要的字段拿出来放入局部变量中
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
如果在Scala 中出现了NotSerializableException,通常问题就在于我们传递了一个不可序列
化的类中的函数或字段。记住,传递局部可序列化变量或顶级对象中的函数始终是安全的。
常见的转化操作和行动操作
基本RDD(受任意数据类型的RDD 支持的转化操作和行动操作)
1. 针对各个元素的转化操作
- map() : map() 的返回值类型不需要和输。
入类型一样 - filter() :根据规则筛选元素。
- flatMap() : 每个输入元素生成多个输出元素,返回值是一个返回值序列的迭代器。得到的是一个包含各个迭代器可访问的所有元素的RDD。flatMap() 的一个简
单用途是把输入的字符串切分为单词。
map 和flatmap的区别如下:
例子:map() 对RDD 中的所有数求平方
python中:
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
print "%i " % (num)
- 1
- 2
- 3
- 4
scala中:
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
- 1
- 2
- 3
2. 伪集合操作
尽管RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。如下图所示:
注意,这些操作都要求操作的RDD是相同数据类型的。
另外distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。
3. 行动操作
- reduce : 接收一个函数作为参数,这个
函数要操作两个RDD 的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就
是函数+,可以用它来对我们的RDD 进行累加。 - fold : fold() 和reduce() 类似,接收一个与reduce() 接收的函数签名相同的函数,再加上一个
“初始值”来作为每个分区第一次调用时的结果。 - aggregate() : 函数返回值类型不必与所操作的RDD类型相同。与fold() 类似,使用aggregate()时,需要提供我们期待返回的类型的初始值。如可用来计算RDD 的平均值。
例子:
python中:
#函数+
sum = rdd.reduce(lambda x, y: x + y)
#平均值
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])
- 1
- 2
- 3
- 4
- 5
- 6
- 7
scala中:
//函数+
val sum = rdd.reduce((x, y) => x + y)
//平均值
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
- 1
- 2
- 3
- 4
- 5
- 6
- 7
RDD 的一些行动操作会以普通集合或者值的形式将RDD的部分或全部数据返回驱动器程序中。
- collect() :把数据返回驱动器程序中最简单、最常见的操作是collect(),它会将整个RDD 的内容返回。collect() 通常在单元测试中使用。使用collect() 使得RDD 的值与预期结果之间的对比变得很容易。由于需要将数据
复制到驱动器进程中,collect()要求所有数据都必须能一同放入单台机器的内存中。
- take(n) : 返回RDD中的n个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。需要注意的是,这些操作返回元素的顺序与你预期的可能不一样。
- top() : 如果为数据定义了顺序,就可以使用top()从RDD中获取前几个元素。top()会使用数据的默认顺序,但我们也可以提供自己的比较函数,来提取前几个元素。
- takeSample(withReplacement, num,seed) : 在驱动器程序中对我们的数据进行采样。takeSample(withReplacement, num,seed) 函数可以让我们从数据中获取一个采样,并指定是否替换。
- foreach() :有时我们会对RDD中的所有元素应用一个行动操作,但是不把任何结果返回到驱动器程序中,这也是有用的。比如可以用JSON 格式把数据发送到一个网络服务器上,或者把数据存到数据库中。不论哪种情况,都可以使用foreach()行动操作来对RDD中的每个元素进行操作,而不需要把RDD 发回本地。
更多操作:
4. 在不同RDD类型之间转换
有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD 上,而join() 只能用在键值对RDD 上。
在Scala 中,将RDD转为有特定函数的RDD(比如在RDD[Double]上进行数值操作)是由隐式转换来自动处理的。上面提到过,我们需要加上import org.apache.spark.SparkContext._来使用这些隐式转换。这些隐式转换可以隐式地将一个RDD转为各种封装类,比如DoubleRDDFunctions(数值数据的RDD)和PairRDDFunctions(键值对RDD),这样我们就有了诸如mean() 和variance() 之类的额外的函数。
Python 的API 结构与Java和Scala有所不同。在Python中,所有的函数都实现在基本的RDD 类中,但如果操作对应的RDD数据类型不正确,就会导致运行时错误。
持久化
为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark 持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。
出于不同的目的,我们可以为RDD选择不同的持久化级别。在Scala和Java 中,默认情况下persist() 会把数据以序列化的形式缓存在JVM 的堆空
间中。在Python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM 堆空间中。
在Scala 中使用persist()
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
- 1
- 2
- 3
- 4
如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存
策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到
已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的
分区来说,被移除的分区都会写入磁盘。不论哪一种情况,都不必担心你的作业因为缓存
了太多数据而被打断。不过,缓存不必要的数据会导致有用的数据被移出内存,带来更多
重算的时间开销。
RDD 还有一个方法叫作unpersist(),调用该方法可以手动把持久化的RDD 从缓
存中移除。