这是CS100.1x第一个提交的有意义的作业,自己一遍做下来对PySpark的基本应用应该是可以掌握的。相关ipynb文件见我github。
这次作业的目的如题目一样——word count,作业分成4个部分,遇到不懂的地方,时刻记得查API。
Part 1 Creating a base RDD and pair RDDs
Create a base RDD
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)
我们通过sc.parallelize方法来把python的list创建为RDD,打印的结果是
<class 'pyspark.rdd.RDD'>
我们可以看到,wordsRDD已经变成了pyspark.rdd.RDD类
Pluralize and test
下面几个将介绍map()的用法。注释里面带有TODO的就表示这里是作业,具体的功能实现在函数的注释里有。测试用例我就不在这里写了,大家可以去我的github上下载源文件。我这里重点讲功能实现。
# TODO: Replace <FILL IN> with appropriate code
def makePlural(word):
"""Adds an 's' to `word`.
Note:
This is a simple function that only adds an 's'. No attempt is made to follow proper
pluralization rules.
Args:
word (str): A string.
Returns:
str: A string with 's' added to it.
"""
return word + 's'
print makePlural('cat')
# One way of completing the function
def makePlural(word):
return word + 's'
print makePlural('cat')
这里的函数实现就仅仅把单数变复数,所以在字符串上加's'就行。
Apply makePlural to the base RDD
# TODO: Replace <FILL IN> with appropriate code
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.collect()
我可以看到map()里放入的是一个函数,作用是对RDD里的每个元素实现该函数的操作。
Pass a lambda function to map
# TODO: Replace <FILL IN> with appropriate code
pluralLambdaRDD = wordsRDD.map(lambda x: x + 's')
print pluralLambdaRDD.collect()
这里的知识点是map和lamda的结合,在后面我们会越来越多的和lamda打交道,因为很多函数我们仅仅只用一次,所以lambda来定义匿名函数是再适合不过了。
这里lamda后面的x可以是任何值,是占位符的作用,表示RDD里的任何一个元素,冒号后面的表示你要对该元素的操作。就这么简单。
Length of each word
# TODO: Replace <FILL IN> with appropriate code
pluralLengths = (pluralRDD
.map(lambda x:len(x))
.collect())
print pluralLengths
这里要返回每个元素的长度,所以直接返回len(x)就行。
Pair RDDs
这里要介绍的是pair RDD,也就是key-value的RDD。python中,用tuple来实现。题目要求是把原来的RDD里的值变成(word, 1)的样子,用lambda很好解决。
# TODO: Replace <FILL IN> with appropriate code
wordPairs = wordsRDD.map(lambda x: (x,1))
print wordPairs.collect()
Part 2 Counting with pair RDDs
groupByKey() approach
我们先用groupByKey()来进行count word的操作。这个函数会把RDD里面相同的key放到一个list里面,然后存在一个数据块上。但是这样存在两个问题:
- 这种操作涉及到大量不同机器不同数据块的数据移动
- 生成的list会很大,可能对某一个worker来说,负担太重
# TODO: Replace <FILL IN> with appropriate code
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
print '{0}: {1}'.format(key, list(value))
我们看到,用法其实很简单,直接使用就行。但是最后的结果是这样,并没有把结果相加
[('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])]
Use groupByKey() to obtain the counts
我们看到上面的结果是一个key对应了一个list,而并非是真正的结果,所以这里还需要进一步操作,计算list的长度,或者求和都可以。
# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.mapValues(lambda x : sum(list(x)))
print wordCountsGrouped.collect()
这里我用了一个新的方法mapValues(),这个方法是直接传入value进行计算,不改变key的值。
Counting using reduceByKey
reduceByKey()相比于groupByKey()就有效得多。因为它是在本地计算好了后,进行reduce操作,类似与MapReduce里的combiner。其实这里还可以这么写
wordCounts = wordPairs.reduceByKey(add)
但是这么不让from operator import add,所以写成lambda表达式。
All together
把前面所有的知识点写成一句。。。
# TODO: Replace <FILL IN> with appropriate code
wordCountsCollected = (wordsRDD
.map(lambda x: (x,1))
.reduceByKey(lambda a,b: a+b)
.collect())
print wordCountsCollected
Part 3 Finding unique words and a mean value
Unique words
这里是计算RDD里不同单词的数目。我们在前面已经reduce过了,实际上把相同的合并了,这里只需要计算法RDD里的元素个数就行
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = wordCounts.count()
print uniqueWords
Mean using reduce
这里是要把所有的value加起来,然后除以元素的个数,也就是平均每个元素有多少次。用到了一个新的函数values(),这个方法直接返回所有value的list。
# TODO: Replace <FILL IN> with appropriate code
from operator import add
totalCount = (wordCounts
.values()
.reduce(add))
average = totalCount / float(wordCounts.count())
print totalCount
print round(average, 2)
Part 4 Apply word count to a file
这里是综合了前面所有的知识,来对一个文本文件进行处理,还涉及到文本文件处理的一些方法。
wordCount function
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
"""Creates a pair RDD with word counts from an RDD of words.
Args:
wordListRDD (RDD of str): An RDD consisting of words.
Returns:
RDD of (str, int): An RDD consisting of (word, count) tuples.
"""
return wordListRDD.map(lambda x: [x,1]).reduceByKey(lambda a,b: a+b)
print wordCount(wordsRDD).collect()
Capitalization and punctuation
真实的文本文件远比我们之前碰到的例子复杂。我们需要考虑三个问题:
- 单词里的大小写,例如Spark和spark应该是同一个单词
- 标点符号要去掉
- 单词前后的空格也要去掉
# TODO: Replace <FILL IN> with appropriate code
import re
def removePunctuation(text):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained. Other characters should should be
eliminated (e.g. it's becomes its). Leading and trailing spaces should be removed after
punctuation is removed.
Args:
text (str): A string.
Returns:
str: The cleaned up string.
"""
return re.sub(r'[^ws]','',text).strip().lower()
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
这里re.sub()里面的正则表达式有点复杂。大概意思是把所有的标点都换成'',这里的r表示原生字符串,也就是不进行转义,^表示否,w和s分别表示单词和空格。所以结果是只保留单词和空格。
Load a text file
这里用到了Complete Works of William Shakespeare,来自Project Gutenberg,作为文本。
具体文件从哪里来,我们不用管,运行就行。
# Just run this code
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)
shakespeareRDD = (sc
.textFile(fileName, 8)
.map(removePunctuation))
print '
'.join(shakespeareRDD
.zipWithIndex() # to (line, lineNum)
.map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line'
.take(15))
Words from lines
我们这里要把行转为单词。上面的RDD中,每个元素都是一行。假如我们直接用split方法,最后是生成一个list,我们还需要把list解开,所以这里用flatMap()
# TODO: Replace <FILL IN> with appropriate code
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x : x.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount
Remove empty elements
在上面操作的基础上,去掉空的单词。
# TODO: Replace <FILL IN> with appropriate code
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x !='')
shakeWordCount = shakeWordsRDD.count()
print shakeWordCount
Count the words
这个就和之前的工作一样了。不多说了。
# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15,key=lambda(w,c):-c)
print '
'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))