Common Transformations and Actions
本章中,我们浏览了Spark中大多数常见的transformation(转换)和action(开工)。在包含特定数据类型的RDD上可以进行额外的操作,例如,可以对纯数字RDD使用统计函数,对键值对的RDD进行聚合操作。后面的章节我们会介绍这些特别的操作和RDD类型间的转换。
Basic RDD (基础RDD)
首先,在忽略数据的影响的前提下,我们将描述所有的RDD上可以执行的transformation和action。
element-wise transformation(逐元素的转换)
你可能会使用的两种最常见的转换是map()
和filter()
(见图3-2)。map()转换接受一个函数参数,RDD中的每个元素都会经过map中的函数处理,生成新的元素组成的RDD。filter()
转换接受一个函数参数,并且返回一个能通过filter()
函数的元素组成的RDD。
图3-2,输入RDD到mapRDD和filterRDD
map()
函数可以做太多太多事情了,如:抓取网站关联的每个URL放入集合,对数字进行平方。非常有用的一点是map()
的返回值不必须和输入类型一样,这样如果你有个String类型的RDD并且map()
函数会把字符串转换成Double类型返回,我们的输入RDD会是RDD[String],结果RDD会是RDD[Double]。
看一个基础的例子,通过map()对数字平方。(Example3-26到3-28):
Example 3-26. Python squaring the values in an RDD
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
print "%i " % (num)
Example 3-27. Scala squaring the values in an RDD
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
Example 3-28. Java squaring the values in an RDD
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));
有时我们想为每个输入元素产生多个输出元素。这种操作叫做flatMap()
。和
map()
一样,RDD中的每个元素都会被flatMap()
中的函数调用。这个操作不是返回一个元素,而是返回一个迭代器,其中包含要返回的值。我们没有生成迭代器的RDD,而是返回一个由所有迭代器中元素组成的RDD。flatMap()
一个简单的用处就是把输入的字符串切割成单词,示例:
Example 3-29. flatMap() in Python, splitting lines into words
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"
Example 3-30. flatMap() in Scala, splitting lines into multiple words
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello
Example 3-31. flatMap() in Java, splitting lines into multiple words
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
words.first(); // returns "hello"
在图3-3中有map()
和flatMap()
差异的图解。你可以把flatMap()
看做返回压平了的迭代器(如Example中展示,flatMap中的函数把传入的字符串切割成了单词数组,但是返回的是一个个单词组成的RDD。传入flatMap()
的函数虽然看起来是把数据转换成列表,但列表被"压平"了,所以返回的RDD是列表元素组成的RDD),所以当操作结束返回的不是一个列表RDD,而是列表中每个元素组成的RDD。
Pseudo set operations(伪集操作)
RDD本身不是很标准的集合,但RDD支持很多数学集合操作,如交集(intersection)和联合(union)。图3-4有展示了四个操作。很重要的一点是,所有的操作都需要使用相同类型的RDD。
我们说RDD不是标准的集合是因为不能满足集合的元素唯一性,因为RDD中经常有元素的重复。如果想要去重,我们可以使用RDD.distinc()
这个transformation(转换)来产生一个新的没有重复元素的RDD。注意distinct()的代价非常高昂,因为它会在网络上整理(shuffling)所有的数据来确保我们只收到每个元素的唯一副本。我们会在第四章来详细讨论洗牌(shuffling)和如何避免洗牌。
最简单的集合操作是union(other)
,会返回由两个源数据组成的RDD。这在许多用例中都很有用,例如从多个来源处理日志文件。不像数学中的并集操作,如果在输入RDD中有重复的元素,Spark的union操作会包含重复的元素(当然我们可以使用distinct来修正)。
Spark也提供了intersection(other)
方法,会返回两个RDD中共有的元素。intersection()
运行时会删除所有重复的元素(包括单个RDD中的重复元素)。尽管intersection()和union()是很相似的概念,但是intersection()性能差很多,因为他需要整理整个网络的数据去确定共有元素。
有时候经过考虑我们需要删除一些数据。subtract(other)
函数可以从一个RDD中删除另一个RDD中包含的元素,即第一个RDD减去第二个RDD的差。与intersection()
一样,subtract会进行耗时耗力的数据洗牌(shuffle)。
我们还可以计算两个RDD的笛卡尔积,如图3-5。a是一个源RDD中的元素,b是另一个源RDD中的元素,两个RDD做笛卡尔积transformation(转换)会返回所有可能的(a,b)元素对儿。当我们想要思考所有可能的元素对儿的相似性的时候笛卡尔积就很有用,比如计算每个用户对所提供价格的期望兴趣。我们还可以对RDD自身做笛卡尔积,来实现很有用的需求如用户相似度。需要注意的是笛卡尔积在较大数据的处理上代价非常高。
解释一下笛卡尔积,笛卡尔积就是两个集合的乘积。如果两个集合c(数学,语文),s(Alice,Bob,Carl),一个课程集合,一个学生集合,两个集合的笛卡尔积就是((数学,Alice),(数学,Bob),(数学,Carl),(语文,Alice),(语文,Bob),(语文,Carl)),这就是学生集合所有选课的可能性。
表3-2和3-3总结了上述的和常用的RDD转换。
贴完图发现时英文的,我再手打一遍吧。
表3-2 包含{1,2,3,3}的RDD的基本转换
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
map() | 对RDD中的每个元素应用一个函数,返回一个RDD | rdd.map(x => x+1) | {2,3,4,4} |
flatMap() | 对RDD中的每个元素应用一个函数返回迭代器中的内容。经常用来提取单词 | rdd.flatMap(x => x.to(3) | {1,2,3,2,3,3,3} |
filter() | 返回由每个通过filter()条件的元素组成的RDD | rdd.filter(x => x!=1) | {2,3,3} |
distinct() | 删除重复元素 | rdd.distinct() | {1,2,3} |
sample(withReplacement,fraction,[seed]) | 对RDD进行替换或不替换采样 | rdd.sample,0.5) | Nondeterministic |
Sample是对rdd中的数据集进行采样,并生成一个新的RDD,这个新的RDD只有原来RDD的部分数据,这个保留的数据集大小由fraction来进行控制。
参数说明
withReplacement,这个值如果是true时,采用PoissonSampler取样器(Poisson分布),
否则使用BernoulliSampler的取样器。
Fraction,一个大于0,小于或等于1的小数值,用于控制要读取的数据所占整个数据集的概率。
Seed,这个值如果没有传入,默认值是一个0~Long.maxvalue之间的整数。
表3-2 包含{1,2,3}和{3,4,5}两个RDD的转换
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 产生由两个RDD元素组成的RDD | rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 产生两个RDD共有元素的RDD | rdd.intersection(other) | {3} |
subtract() | 删除一个RDD的内容(如,删除训练数据) | rdd.subtract(other) | {1,2} |
cartesian() | 求与另一个RDD的笛卡尔积 | rdd.cartesian(othre) | {(1,3)(1,4)...(3,5)} |
Actions
在基本的RDD上最常见的action是reduce()
,它会使用一个函数操作同一RDD上两个的元素,然后返回一个新的和RDD类型相同元素。一个简单的例子就是相加函数,可以用来计算RDD的总和。使用reduce()函数,我们可以非常简单的求得RDD上元素之和,计算元素数量,或者执行其他聚合操作。示例:
Example 3-32. reduce() in Python
sum = rdd.reduce(lambda x, y: x + y)
Example 3-33. reduce() in Scala
val sum = rdd.reduce((x, y) => x + y)
Example 3-34. reduce() in Java
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
和reduce()
很相似的是fold()
,它和reduce()
输入函数的签名是一样的,但是除此之外,对于每个分区的初始调用它还需要一个“初始值(zero value)”。你提供的初始值应该是你操作的标识元素,也就是说,多次应用同一函数初始值不应该改变(例如,加法初始值是0,乘法初始值是1,或者元素串连成列表的初始值是个空列表)。
你可以通过修改并返回两个参数中的第一个参数来最小化fold()中的对象创建,但是,不应该修改第二个参数。
fold()
和reduce()
都需要返回结果的类型和我们操作的RDD中元素的类型一样。这种条件对于sum这种操作就很有效,但是有时候我们需要返回一种不同的类型。例如,当我们计算运行平均值时,我们需要保持记录总值和总数量,最终返回一对儿结果。我们可以使用map()
函数将每个元素转换成(元素,1),就变成了我们想要的返回类型,这样reduce()
函数可以对成对数据进行计算,顺利输出成对数据。
aggregate()
函数把我们从必须返回和操作RDD类型一样元素的约束中解脱出来。使用aggregate()
,类似fold,我们提供一个想要返回的初始值类型。然后我们提供一个元素将RDD中的元素和累加器结合起来。最后,我们需要提供第二个函数将两个累加器合并,因为每个节点都在本地累加自己的结果。
我们可以使用aggregate来计算一个RDD的平均值,避免了在fold()
函数之前使用map()
函数进行处理。示例:
Example 3-35. aggregate() in Python
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])
Example 3-36. aggregate() in Scala
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
Example 3-37. aggregate() in Java
class AvgCount implements Serializable {
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
在RDD上的一些action将一些或所有的数据以常规集合或值的形式返回给我们的驱动程序。
最简单和常用的将数据返回给驱动程序的操作是collect()
,这将返回整个RDD的内容。collect()
常用在单元测试中,整个RDD的内容装载进内存也是在预期之中,这使得我们很容易将RDD的值和预期结果进行比较。collect()
受限于需要将所有的数据塞进内存,因为他需要把所有的数据拷贝到驱动程序。
take(n)
返回n个RDD上的元素并且尽可能地最小化访问分区的数量,所以他返回的集合可能有一些偏颇。比较重要的需要注意的一点是这些操作返回的元素可能和你预期的顺序不一样。
这些操作对于单元测试和快速debug来说很有用,但是当数据量过大是,可能会导致运行瓶颈。
如果数据有规定的顺序,我们可以使用top()
函数提取RDD最上面的元素。top()
会使用默认排序来提取数据,但我们也可以提供比较函数来提取顶端元素。
有时我们需要对驱动程序中的数据进行采样。takeSample(withReplacement,num,seed)
函数允许我们对数据使用替换或非替换采样。
有时对RDD中的所有元素进行action却不返回结果给驱动程序很有用。一个很好的例子就是把JSON数据发送给web服务器,或者将记录插入到数据库中。另一种情况,foreach()
操作允许我们对RDD中的每个元素执行计算,而不需要将其带回到本地。
除了我们讲的基本RDD操作之外,更深入的操作的函数名可读性非常好,通过他们的名字你大体就能理解他们所表现的操作方式。count()
返回元素的总数,countByValue()
返回一个唯一值的计数组成的map。表3-4总结了这些action。
Table3-4包含{1,2,3,3}RDD上的基本action
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
collect() | 返回RDD的所有元素 | rdd.collect() | {1,2,3,3} |
count() | 返回RDD元素数量 | rdd.count | 4 |
countByValue() | 每个元素在RDD中出现的次数 | rdd.countByValue() | {(1,1),(2,1),(3,2)} |
take(num) | 返回RDD中num个元素 | rdd.take(2) | {1,2} |
top(num) | 返回RDD顶端的num个元素 | rdd.top(2) | {3,3} |
takeOrdered(num)(ordering) | 返回基于提供的排序的num个元素 | rdd.takeOrdered(2)(myOrdering) | {3,3} |
takeSample(withReplacement,num,[seed]) | 返回num个随机元素 | rdd.takeSample(false,1) | 不确定 |
reduce(func) | 并行地把RDD中的元素结合在一起 | rdd.reduce((x,y)=>x+y) | 9 |
fold(zero)(func) | 和reduce一样但是有初始值 | rdd.fold(0)((x,y)=>x+y) | 9 |
aggregate(zeroValue)(seqOp,combOp) | 类似reduce但是用来返回一个不同类型 | rdd.agregate((0,0)) ((x,y)=> (x._1+y,x._2+1), (x,y) => (x._1+y._1,x._2+y._2) |
(9,4) |
foreach(func) | RDD中每个元素都会应用在func函数上 | rdd.foreach(func) | 无 |
Converting Betwenn RDD Types(RDD之间的转换)
有些函数只能在已知RDD类型的时候使用,如mean()
和variance()
能用在数字类型的RDD,join()
用在键值对的RDD上。我们在第六章会介绍数字数据,第四章介绍键值对RDD。在Sacla和Java中,这些方法没有在标准RDD类中定义,所以为了使用这些额外功能我们必须确保得到了正确的专门化类。
Scala
在Scala中,用特殊的函数转换RDD(如,在Double类型的RDD上声明数字(numberic)函数),特殊的函数是指使用隐式转换自动处理。之前在17页提到过的“初始化SparkContext”为了这些转换工作,我们需要添加import org.apache.spark.SparkContext._
。你可以在SparkContext 对象文档中查看隐式转换的清单(scala的隐式转换很好用噢)。这些隐式转换把RDD转换成不同的包装类,比如DoubleRDDFunctions(包装数字数据的RDD)和PairRDDFunctions(包装键值对),用来提供附加功能如mean()
和variance()
。
隐式转换功能非常强大,但有时会让人困惑。如果你可以在RDD上调用一个mean()
函数,你可能查看了RDD类的Scala文档并且注意到没有mean()
函数。但是调用却因为RDD[Double]隐式转换成DoubleRDDFunctions成功了。当在你的RDD的Scala文档中查找函数式,确定一下是否在包装类中能够使用这些函数。
Java
在JAVA中转换两个特别类型的RDD就比较明显。特别地,对于这些类型的RDD,有专门的类成为JavaDoubleRDD和JavaPairRDD,这些类型的数据提供了额外的方法。这中方法的一个有点就是你可以很清楚的理解到底发生了什么,但是这种方式可能有点麻烦。
要构建这些特殊类型的RDD,而不是总是使用函数类,我们将需要使用专门的版本。如果我们想从T泛型的RDD中创建一个DoubleRDD,我们使用DoubleFunction<T>
而不是Function<T,Double>
。表3-5展示了这些特殊化函数和他们的用法。
我们还需要在RDD上调用不同的函数(这样我们不能创建一个Double函数并传递给map()
函数)。当我们需要一个DoubleRDD时,不是调用map()
进行转换,而我们需要调用mapToDouble()
,所有其他的函数都遵循相同的模式。
表3-5特定类型函数的Java接口
函数名 | 等价函数 | 用法 |
---|---|---|
DoubleFlatMapFunction<T> | Function<T,Iterable<Double>> | 从flatMapToDouble中得到DoubleRDD |
DoubleFunction<T> | Function<T,double> | 从mapToDouble中得到DoubleRDD |
PairFlatMapFunction<T,K,V> | Function<T,Iterable<Tuple2<K,V>>> | 从flatMapToPair中得到PairRDD<K,V> |
PairFunction<T,K,V> | Function<T,Tuple2<K,V>> | 从mapToPair中得到PairRDD<K,V> |
我们可以修改一下Example3-28,对RDD中的元素平方,产生一个JavaDoubleRDD,如Example3-38。这能够使我们访问额外的DoubleRDD特定的函数功能如`mean()`和`variance()`。
``` Example 3-38. Creating DoubleRDD in Java JavaDoubleRDD result = rdd.mapToDouble( new DoubleFunction
Python的API和Java、Scala的结构不同。在Python中所有的函数都是在基本RDD类上实现的,但是如果RDD的类型不正确,运行时会失败。
### Persistence(Caching)
向我们之前讨论的,Spark的RDD是惰性求值的,并且我们有时会希望多次使用同一个RDD。如果我们天真地这样做,每当我们调用一个action,SPark将重新计算RDD和他所有的依赖项。对于迭代算法,这可能导致非常高昂的代价,因为它要多次查看数据。Example3-39展示了另一个小例子,统计并写出相同的RDD。
``` Example 3-39. Double execution in Scala val result = input.map(x => x*x) println(result.count()) println(result.collect().mkString(",")) ``` 为了防止对一个RDD计算多次,我们可以让Spark持久化数据。当我们让Spark持久化一个RDD时,计算RDD的节点将存储他们的分区。如果一个将数据持久化了的节点失败了,Spark会在需要时重新计算剩下数据的分区。如果我们希望处理节点失败时避免机器卡顿,可以将数据复制在多个节点上。
Spark针对我们的目的提供了多种可供选择的持久化级别,如表3-6。在Scala和Java中,默认的`persist()`会如不可序列化的对象一样把数据保存在JVM的堆中。在Python中,我们总是通过序列化数据来持久化存储,所以默认的是pickle(python中的序列化)对象而不是保存在JVM堆中。当我们将数据写入磁盘或非堆存储时,这些数据总是要被序列化。
表3-6.org.apche.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级别。如果需要我们可以通过在存储级别尾部添加_2 来在两台机器上复制数据。
堆外缓存是实验性的并且使用的是Tachyon。如果你对Spark的堆外缓存感兴趣,可以查看一下Running Spark on Tachyon guide。
Example 3-40. persist() in Scala
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
需要注意的是我们在第一次action之前调用了RDD的persist()
。persist()
调用本身不强制求值。
如果你试图在内存中缓存太多数据,Spark会使用最近最少使用的缓存策略来自动清除老分区。对于MEMORY_ONLY存储级别,它会在它们下一次访问是重新计算这些分区,对于MEMORY_AND_DISK级别,会把它们写入磁盘。这意味着你不用担心如果你缓存太多数据而导致job的崩溃。但是,这会有大量重复计算的时间。
最后,RDD还有一个unpersist()
方法,可以让你手动将RDD从缓存中删除。
Conclusion
这一章中,我们介绍了RDD的操作模型和大量的常用操作。如果你看到这,恭喜你已经学习了所有SPark中的核心概念。下一章中,我们将介绍关于键值对RDD上可使用的特殊操作集合,这些操作在并行条件下聚合和分组数据非常常用。在此之后,我们将讨论各种数据源的输入和输出,以及使用SparkContext的高级话题。