zoukankan      html  css  js  c++  java
  • CS100.1x-lab1_word_count_student

    这是CS100.1x第一个提交的有意义的作业,自己一遍做下来对PySpark的基本应用应该是可以掌握的。相关ipynb文件见我github

    这次作业的目的如题目一样——word count,作业分成4个部分,遇到不懂的地方,时刻记得查API。

    Part 1 Creating a base RDD and pair RDDs

    Create a base RDD

    wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
    wordsRDD = sc.parallelize(wordsList, 4)
    # Print out the type of wordsRDD
    print type(wordsRDD)
    

    我们通过sc.parallelize方法来把python的list创建为RDD,打印的结果是

    <class 'pyspark.rdd.RDD'>
    

    我们可以看到,wordsRDD已经变成了pyspark.rdd.RDD类

    Pluralize and test

    下面几个将介绍map()的用法。注释里面带有TODO的就表示这里是作业,具体的功能实现在函数的注释里有。测试用例我就不在这里写了,大家可以去我的github上下载源文件。我这里重点讲功能实现。

    # TODO: Replace <FILL IN> with appropriate code
    def makePlural(word):
        """Adds an 's' to `word`.
    
        Note:
            This is a simple function that only adds an 's'.  No attempt is made to follow proper
            pluralization rules.
    
        Args:
            word (str): A string.
    
        Returns:
            str: A string with 's' added to it.
        """
        return word + 's'
    
    print makePlural('cat')
    
    # One way of completing the function
    def makePlural(word):
        return word + 's'
    
    print makePlural('cat')
    
    

    这里的函数实现就仅仅把单数变复数,所以在字符串上加's'就行。

    Apply makePlural to the base RDD

    # TODO: Replace <FILL IN> with appropriate code
    pluralRDD = wordsRDD.map(makePlural)
    print pluralRDD.collect()
    

    我可以看到map()里放入的是一个函数,作用是对RDD里的每个元素实现该函数的操作。

    Pass a lambda function to map

    # TODO: Replace <FILL IN> with appropriate code
    pluralLambdaRDD = wordsRDD.map(lambda x: x + 's')
    print pluralLambdaRDD.collect()
    

    这里的知识点是map和lamda的结合,在后面我们会越来越多的和lamda打交道,因为很多函数我们仅仅只用一次,所以lambda来定义匿名函数是再适合不过了。

    这里lamda后面的x可以是任何值,是占位符的作用,表示RDD里的任何一个元素,冒号后面的表示你要对该元素的操作。就这么简单。

    Length of each word

    # TODO: Replace <FILL IN> with appropriate code
    pluralLengths = (pluralRDD
                     .map(lambda x:len(x))
                     .collect())
    print pluralLengths
    

    这里要返回每个元素的长度,所以直接返回len(x)就行。

    Pair RDDs

    这里要介绍的是pair RDD,也就是key-value的RDD。python中,用tuple来实现。题目要求是把原来的RDD里的值变成(word, 1)的样子,用lambda很好解决。

    # TODO: Replace <FILL IN> with appropriate code
    wordPairs = wordsRDD.map(lambda x: (x,1))
    print wordPairs.collect()
    

    Part 2 Counting with pair RDDs

    groupByKey() approach

    我们先用groupByKey()来进行count word的操作。这个函数会把RDD里面相同的key放到一个list里面,然后存在一个数据块上。但是这样存在两个问题:

    1. 这种操作涉及到大量不同机器不同数据块的数据移动
    2. 生成的list会很大,可能对某一个worker来说,负担太重
    # TODO: Replace <FILL IN> with appropriate code
    # Note that groupByKey requires no parameters
    wordsGrouped = wordPairs.groupByKey()
    for key, value in wordsGrouped.collect():
        print '{0}: {1}'.format(key, list(value))
    

    我们看到,用法其实很简单,直接使用就行。但是最后的结果是这样,并没有把结果相加

    [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])]
    

    Use groupByKey() to obtain the counts

    我们看到上面的结果是一个key对应了一个list,而并非是真正的结果,所以这里还需要进一步操作,计算list的长度,或者求和都可以。

    # TODO: Replace <FILL IN> with appropriate code
    wordCountsGrouped = wordsGrouped.mapValues(lambda x : sum(list(x)))
    print wordCountsGrouped.collect()
    

    这里我用了一个新的方法mapValues(),这个方法是直接传入value进行计算,不改变key的值。

    Counting using reduceByKey

    reduceByKey()相比于groupByKey()就有效得多。因为它是在本地计算好了后,进行reduce操作,类似与MapReduce里的combiner。其实这里还可以这么写

    wordCounts = wordPairs.reduceByKey(add)
    

    但是这么不让from operator import add,所以写成lambda表达式。

    All together

    把前面所有的知识点写成一句。。。

    # TODO: Replace <FILL IN> with appropriate code
    wordCountsCollected = (wordsRDD
                           .map(lambda x: (x,1))
                           .reduceByKey(lambda a,b: a+b)
                           .collect())
    print wordCountsCollected
    

    Part 3 Finding unique words and a mean value

    Unique words

    这里是计算RDD里不同单词的数目。我们在前面已经reduce过了,实际上把相同的合并了,这里只需要计算法RDD里的元素个数就行

    # TODO: Replace <FILL IN> with appropriate code
    uniqueWords = wordCounts.count()
    print uniqueWords
    

    Mean using reduce

    这里是要把所有的value加起来,然后除以元素的个数,也就是平均每个元素有多少次。用到了一个新的函数values(),这个方法直接返回所有value的list。

    # TODO: Replace <FILL IN> with appropriate code
    from operator import add
    totalCount = (wordCounts
                  .values()
                  .reduce(add))
    average = totalCount / float(wordCounts.count())
    print totalCount
    print round(average, 2)
    

    Part 4 Apply word count to a file

    这里是综合了前面所有的知识,来对一个文本文件进行处理,还涉及到文本文件处理的一些方法。

    wordCount function

    # TODO: Replace <FILL IN> with appropriate code
    def wordCount(wordListRDD):
        """Creates a pair RDD with word counts from an RDD of words.
    
        Args:
            wordListRDD (RDD of str): An RDD consisting of words.
    
        Returns:
            RDD of (str, int): An RDD consisting of (word, count) tuples.
        """
        return wordListRDD.map(lambda x: [x,1]).reduceByKey(lambda a,b: a+b)
    print wordCount(wordsRDD).collect()
    

    Capitalization and punctuation

    真实的文本文件远比我们之前碰到的例子复杂。我们需要考虑三个问题:

    1. 单词里的大小写,例如Spark和spark应该是同一个单词
    2. 标点符号要去掉
    3. 单词前后的空格也要去掉
    # TODO: Replace <FILL IN> with appropriate code
    import re
    def removePunctuation(text):
        """Removes punctuation, changes to lower case, and strips leading and trailing spaces.
    
        Note:
            Only spaces, letters, and numbers should be retained.  Other characters should should be
            eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
            punctuation is removed.
    
        Args:
            text (str): A string.
    
        Returns:
            str: The cleaned up string.
        """
        return re.sub(r'[^ws]','',text).strip().lower()
    print removePunctuation('Hi, you!')
    print removePunctuation(' No under_score!')
    

    这里re.sub()里面的正则表达式有点复杂。大概意思是把所有的标点都换成'',这里的r表示原生字符串,也就是不进行转义,^表示否,w和s分别表示单词和空格。所以结果是只保留单词和空格。

    Load a text file

    这里用到了Complete Works of William Shakespeare,来自Project Gutenberg,作为文本。
    具体文件从哪里来,我们不用管,运行就行。

    # Just run this code
    import os.path
    baseDir = os.path.join('data')
    inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
    fileName = os.path.join(baseDir, inputPath)
    
    shakespeareRDD = (sc
                      .textFile(fileName, 8)
                      .map(removePunctuation))
    print '
    '.join(shakespeareRDD
                    .zipWithIndex()  # to (line, lineNum)
                    .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                    .take(15))
    

    Words from lines

    我们这里要把行转为单词。上面的RDD中,每个元素都是一行。假如我们直接用split方法,最后是生成一个list,我们还需要把list解开,所以这里用flatMap()

    # TODO: Replace <FILL IN> with appropriate code
    shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x : x.split(' '))
    shakespeareWordCount = shakespeareWordsRDD.count()
    print shakespeareWordsRDD.top(5)
    print shakespeareWordCount
    

    Remove empty elements

    在上面操作的基础上,去掉空的单词。

    # TODO: Replace <FILL IN> with appropriate code
    shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x !='')
    shakeWordCount = shakeWordsRDD.count()
    print shakeWordCount
    

    Count the words

    这个就和之前的工作一样了。不多说了。

    # TODO: Replace <FILL IN> with appropriate code
    top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15,key=lambda(w,c):-c)
    print '
    '.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
    
  • 相关阅读:
    UVA 11174 Stand in a Line,UVA 1436 Counting heaps —— (组合数的好题)
    UVA 1393 Highways,UVA 12075 Counting Triangles —— (组合数,dp)
    【Same Tree】cpp
    【Recover Binary Search Tree】cpp
    【Binary Tree Zigzag Level Order Traversal】cpp
    【Binary Tree Level Order Traversal II 】cpp
    【Binary Tree Level Order Traversal】cpp
    【Binary Tree Post order Traversal】cpp
    【Binary Tree Inorder Traversal】cpp
    【Binary Tree Preorder Traversal】cpp
  • 原文地址:https://www.cnblogs.com/-Sai-/p/6662312.html
Copyright © 2011-2022 走看看