Actions Available on Pair RDDs (键值对RDD可用的action)
和transformation(转换)一样,键值对RDD也可以使用基础RDD上的action(开工),并且键值对RDD有一些利用键值对数据特性的的action,如下表:
表4-3 键值对RDD上的action
函数名 | 描述 | 例子 | 结果 |
---|---|---|---|
countByKey() | 计算每个键元素的总数 | rdd.countByKey() | {(1,1),(3,2)} |
collectAsMap() | 结果收集成一个map便于查询 | rdd.collectAsMap() | Map{(1,2),(3,4),(3,6)} |
lookup(key) | 根据键返回值 | rdd.lookup(3) | [4,6] |
键值对RDD还有很多其他保存RDD的action,我们将在第五章进行讨论。
Data Partitioning(Advanced)(数据分区)
我们这一章讨论的最后一个Spark特性就是如何控制节点间的数据分区。在分布式程序中,主机间的通信代价高昂,所以把数据安排妥当来最小化网络间的通信可以极大地提高性能。很像单台机器的程序需要为数据选择正确的数据结构,Spark能够控制RDD的分区来减少网络间通信。分区不会对所有的应用都有用,举个例子,如果给定的RDD值只被扫描一次,那么预先对其分区没有什么意义。只有多次使用如join这样的的键操作的RDD,分区才有意义。稍后会有一些例子。
Spark中的所有键值对RDD都可以使用分区,因为系统的分组函数是根据每个元素的键。尽管Spark没有明确地控制每个键所对应的工作节点(也因为系统在某些工作节点失败的情况下也能正常运行),它允许程序能够确保一组键会出现在同一个节点上。例如,你可以选择哈希分区(hashpartition)将一个RDD划分成100个分区,这样模除100后有相同哈希值的键会出现在一个节点上。或者你可以使用区间分区(range-partition)按区间对键进行分区,这样键在相同范围内的元素会在相同的节点上。
举个简单例子,想想一个内存中保存大量用户信息的应用,(UserId,UserInfo)组成的键值对RDD,UserInfo包含用户话题订阅列表。这个应用定期把这个表和一个记录了过去五分钟发生的点击事件的小文件结合,就是一个(UserId,LinkInfo)键值对,记录了用户五分钟内点击网站链接的信息的日志。举例来讲,我们想统计用户访问和他们订阅主题无关的链接数量。我们可以执行Spark的join操作,把UserInfo和LinkInfo键值对根据UserId键分组。Example4-22展示了这个例子:
Example 4-22. Scala simple application
// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
//初始化代码;我们从HDFS的Hadoop SequenceFile加载用户信息,它通过他们找到的HDFS block来分发userData元素,Spark并不知道每个UserId在分区中的位置
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// Function called periodically to process a logfile of events in the past 5 minutes;
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
//函数定期被调用处理过去五分钟的事件日志,(假定这个SequenceFIle包含(UserId,Link//Info)键值对
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
这段代码能够达到我们的目的,但是效率会很差。这是因为每次调用processNewLogs()
时调用的join()
操作都不知道键在数据集的分区方式。默认情况下,这些操作会用hash值混洗两个数据集的所有键,把具有相同哈希值的键发送到相同的机器中,然后在这台机器上join相同键的元素(如图4-4)。因为我们知道userData表比每五分钟的点击事件日志大很多,这浪费了大量工作:userData表每次被调用都要在通过网络把数据打乱再用哈希值对键分组,有时候用户表甚至没有变化也要这样做。
改正其实很简单:在程序开始时对userData使用partitionBy()
转换(transformation)来把数据进行哈希分区。还需传递一个spark.HashPartitioner
对象给partitionBy
,如例所示:
Example 4-23. Scala custom partitioner
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions
.persist()
processNewLogs()
可以保持不变:事件RDD是从本地进入的processNewLogs()
方法,并且在本方法中只被使用了一次,所以时间RDD被指定分区就没什么好处。因为我们构建userData时调用了partitionBy()
,Spark会立即知道它被哈希分区了,调用join()
时会利用这些信息。特别是当我们调用userData.join(events)
时,Spark会只混洗eventsRDD,向包含用户数据的相应哈希分区的机器发送带有每个特定UserId的事件(如图4-5)。结果就是只有少量数据需要在网络中通信,程序极大提高速度。
注意一点partitionBy()
是transformation(转换),所以他总是返回一个新RDD,原始RDD不会被改变。RDD一旦创建可以永不改变。因此,持久化并且保存partitionBy()
后的userData的结果是很重要的,而不是保存原始的sequenceFile()
。并且,把100传递给partitionBy()表示分区的数量,这会控制相同数量的并行task(任务)在RDD执行后续的操作(例如:join);通常,这个数量至少和集群上的核心数一样大。
在partitionBy()之后未能持久化会导致后续使用RDD重复对数据分区。没有持久化,已分区RDD的使用将会导致对RDD完整继承关系的重新求导。这是
partitionBy()
的弊端,这会导致跨网络的重复分区和数据洗牌,类似于没有指定分区的情况。
实际上,Spark的许多操作会自动生成附加分区信息的RDD,并且很多操作会利用这些分区信息,除了join()
。举个例子,sortBykey()
和groupByKey()
会分别生成区间分区和哈希分区。另一方面,类似map()
操作产生的新RDD会忘记父RDD的分区信息,因为这种操作理论上有可能修改每条记录的键信息。后面部分会介绍如何决定RDD分区,和Spark不同的操作如何影响分区。
Java,Python和Scala三者的API受益于分区的方式并无二致。但是,在Python中,你不能把一个
Hash Partitioner
对象传递给partitionBy
;你可以直接传递分区要求的数量(例如:rdd.partitionBy(100)
)。
Determining an RDD's Patitioner(决定RDD的分区器)
在Scala和Java中,你可以通过partitioner
的属性决定RDD如何分区(或者Java中的partitioner()
方法)。这回返回一个scala.Option
对象,一个Scala中包含可能存在可能不存在对象的容器类。你还可以调用Option
对象的isDefined()
来检查是否有值,get()
方法返回这个值。如果有值,会是一个spark.Partioner
对象。这本质上是一个表示RDD每个键的分区的函数;稍后会详细介绍。
利用partitioner
属性是个在Spark shell中测试Spark不同操作对分区影响的好手段,还能够检查你想在程序中执行的操作是否符合正确的结果(见Example4-24)。
Example 4-24. Determining partitioner of an RDD
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
在这个简单的shell会话中,我们创建了(Int,Int)
键值对RDD,其初始化是没有任何分区信息(Option对象的值是None)。然后我们通过对第一个RDD哈希分区创建了第二个RDD。如果我们实际上想在后面的操作使用已定义的partitioned
,我们应该在例子第三行输入的末尾加上persist()
。这和在之前例子中需要对userData使用persist()
的原因是相同的:如果不适用persist()
,后续RDD的action计算分区的整个继承关系,这会导致键值对被一遍又一遍地哈希分区。
Operations That Benefit from Partitioning(从分区获益的操作)
很多Spark的操作会导致在网络间根据键对数据洗牌。这些操作都可以通过分区进行优化。像Spark1.0,通过分区可以优化的操作有:cogroup(),groupWith(),join(),lefOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey()和lookup()
。
对于运行单独RDD上的操作,如reduceByKey()
,运行一个分区RDD会导致每个键的所有值在单个机器上计算,只需要最后把本地归约的值从工作节点发送回主节点。对于二元运算,如cogroup()
和join()
,预分区(pre-partitioning)会导致至少一个RDD(已知分区器的RDD)不被混洗。如果RDD都是相同的分区器并且缓存在相同的机器上或者其中之一仍未被计算,那么不会发生网络间的数据洗牌。
Operations That Affect Partitioning(影响分区的操作)
Spark内部知道操作是如何影响分区的,自动对会为数据分区的操作创建的RDD设置分区器。举例来讲,假设你调用了join()
来连接两个RDD;因为有相同键的元素已经被哈希分区到相同的机器上了,Spark知道结果就是哈希分区,在join
产生的结果上的操作如reduceByKey()
会明显变快。
另一方面,有些无法确保会生成已知分区的转换,输出的RDD不会有分区器集。举例来说,如果你对一个哈希分区的键值对RDD调用map()
,map()
中的函数参数理论上可以改变每个元素的键,所以结果不会包含分区器。Spark不会分析你的函数来检查是否改变了键,而是提供了两个操作,mapvalues()
和flatMapValues()
来保证每个元组的键未被改变。
总结一下,所有会输出分区器的操作:cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(),mapValues() (如果父RDD有分区器), flatMapValues() (如果父RDD有分区器), and filter() (如果父RDD有分区器)
。剩下的操作不会产生分区器。
对于二元操作,输出分区器的设置取决于父RDD的分区器。默认情况下,使用哈希分区器,分区的数量由操作的并行度确定。但是,如果父RDD其中之一有分区器集,那该分区器会设置为分区器,如果所有的父RDD都有分区器集,那么设置分区器为第一个父分区器。
Example:PageRank(例:PageRank)
我们认为PageRank(网页排名)算法是一个典型的会因分区提升效率的例子。PageRank算法是以谷歌的Larry Page命名的,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。这可以用来为网页排名,也可以是论文或用户影响力。
PageRank是一个会执行很多join的迭代算法,所以是使用RDD分区的好样本。这个算法包含两个数据集:一个是(pageId,linkList),包含每个网页的邻居列表(该页面包含其他页面的链接,这个链接页面称为邻居页面);另一个是(pageID,rank),包含每个网页的当前排名。它的计算流程大致如下:
1.把每个页面的初始级别设置为1.0.
2.每次迭代,页面p发送rank(p)/numNeighbors(p)
的贡献给它的邻居网站(它有链接的页面)。
3.设置页面p级别为0.5+0.85*收到的贡献。
最后两步重复迭代多次,在这个过程中,算法会渐渐收敛到每个网页正确的PageRank值。实际上,通常进行十次迭代。
Example 4-25 gives the code to implement PageRank in Spark.
Example 4-25. Scala PageRank
// Assume that our neighbor list was saved as a Spark objectFile
//假设邻页列表存在了Spark的objectFile中。
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
// Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD
// will have the same partitioner as links
//把每个页面的初始值设为1,因为使用mapValue,所以RDD会有和链接相同的分区器
var ranks = links.mapValues(v => 1.0)
// Run 10 iterations of PageRank
//运行十遍PageRank的迭代
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// Write out the final ranks
ranks.saveAsTextFile("ranks")
就是这样!算法开始时,rankRDD对每个元素设置1.0的初始值,每次迭代都会更新rank变量。PageRank算法在Spark中表达非常简单:首先将当前等级RDD和静态链接RDD通过join()
结合,这是为了得到(页面ID,链接列表,页面等级)元组,然后使用flatMap
生成“贡献”值发送给每个邻居页面。我们把贡献值按页面ID求和并且设置页面的等级为0.15+0.85*收到的贡献
。
虽然代码本身很简单,但是这个例子为了确保使用高效的方式分区RDD和最小化网络通信做了很多事情:
1.注意每次迭代都是links.join(ranks)
。因为links是一个静态数据集,程序开始就用partitionBy
对它分区了,所以这个RDD并不需要在网络间对数据洗牌。实际上,linksRDD可能会比ranksRDD大很多,因为它保存了每个页面的邻居页面列表,所以这种优化比简单地实现PageRank(如,使用简单的MapReduce)减少了大量的网络任务。
2.同理,我们把linksRDDpersist()
避免对其迭代。
3.当我们第一次创建ranksRDD时,我们使用mapValues()
而不是map()
来保存父RDD(links)的分区信息,所以我们第一次join
开销很小。
4.再循环体中,我们在reduceByKey()
之后执行mapValues()
;因为reduceByKey()
的结果已经被哈希分区了,这将使得将map
的结果与下一次迭代中的链接结合起来更有效率。
为了最大限度地发挥分区优化的潜力,当你不改变键的值时应该使用
mapValues()
或flatMapValues()
。
Custom Partitioners(定制分区器)
尽管哈希分区器和区间分区器可以在很多场景使用,Spark仍然允许你通过提供一个定制的Partitioner
对象来自定义RDD分区方式。这可以帮助你利用特定领域的知识来减少网络通信的消耗。
举例来说,假如我们想使用PageRank算法计算一组web页面,以页面URL做RDD的键,即PageID为URL,使用哈希分区的话,域名相同后缀不相同的URL不会在一个分区(如,http://www.cnn.com/WORLD 和 http://www.cnn.com/US)。我们知道同一个域名中的链接往往彼此连接。由于PageRank需要在每次迭代时将每个页面的消息发送给每个邻居,因此定制分区器有助于将这些页面分组到相同的分区中。我们可以定制一个分区器把相同域名的URL分区到一个节点上。
定制分区器需要是org.apache.spark.Partitioner
的子类并且实现三个方法:
-
numPartitions:Int,返回你创建分区的数量。
-
getPartition(key:Any):Int,返回对应键的分区ID(0到numPartitions-1)。
-
equals(),标准Java相等方法。这个实现很重要,因为Spark需要测试你的分区器与其它实例是否相等来判断两个RDD的分区是否是一种方式。
有一点需要注意的是如果你的算法中依赖了Java的hashCode()
方法,这有可能返回一个负数。你需要确保getPartition()
不会返回负数。
Example4-26展示了一个我们之前描述的域名分区器,这个分区器只对每个URL的域名进行哈希分区。
Example 4-26. Scala custom partitioner
class DomainNamePartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val domain = new Java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
if (code < 0) {
code + numPartitions // Make it non-negative
} else {
code
}
}
// Java equals method to let Spark compare our Partitioner objects
override def equals(other: Any): Boolean = other match {
case dnp: DomainNamePartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
}
}
注意在equals()
方法中,我们使用Scala的模式匹配操作测试other
是否是一个DomainNamePartitioner
对象,如果是的就跳进里面的方法;这和使用Java的intanceof是一样的。
使用定制分区器非常简单:把它传给partitionBy()
方法就行了。Spark中很多基于数据洗牌的操作,如join()
和groupByKey()
,也可以使用可选的Partitioner
对象来控制分区的输出。
在Java中创建一个定制分区器和Scala很相似:直接继承spark.Partitioner
类并且实现需要的方法就行。
在Python中,你不需要继承Partitioner
类,但是需要给RDD.partitionBy()
方法传递一个哈希函数作为额外的参数。示例如下:
Example 4-27. Python custom partitioner
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20, hash_domain) # Create 20 partitions
注意一点你传递的哈希函数会作为和其他RDD比较的标识。如果你想使用相同的分区器对多个RDD分区,那么需要传递相同的函数对象(如,全局函数)而不是为每个创建一个lambda表达式。
Conclusion(总结)
本章中,我们了解了Spark中处理键值对数据的特殊函数。第三章学到的技术对键值对仍然适用。下一章节,我们将了解如何加载保存数据。