zoukankan      html  css  js  c++  java
  • CS100.1x-lab1_word_count_student

    这是CS100.1x第一个提交的有意义的作业,自己一遍做下来对PySpark的基本应用应该是可以掌握的。相关ipynb文件见我github

    这次作业的目的如题目一样——word count,作业分成4个部分,遇到不懂的地方,时刻记得查API。

    Part 1 Creating a base RDD and pair RDDs

    Create a base RDD

    wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
    wordsRDD = sc.parallelize(wordsList, 4)
    # Print out the type of wordsRDD
    print type(wordsRDD)
    

    我们通过sc.parallelize方法来把python的list创建为RDD,打印的结果是

    <class 'pyspark.rdd.RDD'>
    

    我们可以看到,wordsRDD已经变成了pyspark.rdd.RDD类

    Pluralize and test

    下面几个将介绍map()的用法。注释里面带有TODO的就表示这里是作业,具体的功能实现在函数的注释里有。测试用例我就不在这里写了,大家可以去我的github上下载源文件。我这里重点讲功能实现。

    # TODO: Replace <FILL IN> with appropriate code
    def makePlural(word):
        """Adds an 's' to `word`.
    
        Note:
            This is a simple function that only adds an 's'.  No attempt is made to follow proper
            pluralization rules.
    
        Args:
            word (str): A string.
    
        Returns:
            str: A string with 's' added to it.
        """
        return word + 's'
    
    print makePlural('cat')
    
    # One way of completing the function
    def makePlural(word):
        return word + 's'
    
    print makePlural('cat')
    
    

    这里的函数实现就仅仅把单数变复数,所以在字符串上加's'就行。

    Apply makePlural to the base RDD

    # TODO: Replace <FILL IN> with appropriate code
    pluralRDD = wordsRDD.map(makePlural)
    print pluralRDD.collect()
    

    我可以看到map()里放入的是一个函数,作用是对RDD里的每个元素实现该函数的操作。

    Pass a lambda function to map

    # TODO: Replace <FILL IN> with appropriate code
    pluralLambdaRDD = wordsRDD.map(lambda x: x + 's')
    print pluralLambdaRDD.collect()
    

    这里的知识点是map和lamda的结合,在后面我们会越来越多的和lamda打交道,因为很多函数我们仅仅只用一次,所以lambda来定义匿名函数是再适合不过了。

    这里lamda后面的x可以是任何值,是占位符的作用,表示RDD里的任何一个元素,冒号后面的表示你要对该元素的操作。就这么简单。

    Length of each word

    # TODO: Replace <FILL IN> with appropriate code
    pluralLengths = (pluralRDD
                     .map(lambda x:len(x))
                     .collect())
    print pluralLengths
    

    这里要返回每个元素的长度,所以直接返回len(x)就行。

    Pair RDDs

    这里要介绍的是pair RDD,也就是key-value的RDD。python中,用tuple来实现。题目要求是把原来的RDD里的值变成(word, 1)的样子,用lambda很好解决。

    # TODO: Replace <FILL IN> with appropriate code
    wordPairs = wordsRDD.map(lambda x: (x,1))
    print wordPairs.collect()
    

    Part 2 Counting with pair RDDs

    groupByKey() approach

    我们先用groupByKey()来进行count word的操作。这个函数会把RDD里面相同的key放到一个list里面,然后存在一个数据块上。但是这样存在两个问题:

    1. 这种操作涉及到大量不同机器不同数据块的数据移动
    2. 生成的list会很大,可能对某一个worker来说,负担太重
    # TODO: Replace <FILL IN> with appropriate code
    # Note that groupByKey requires no parameters
    wordsGrouped = wordPairs.groupByKey()
    for key, value in wordsGrouped.collect():
        print '{0}: {1}'.format(key, list(value))
    

    我们看到,用法其实很简单,直接使用就行。但是最后的结果是这样,并没有把结果相加

    [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])]
    

    Use groupByKey() to obtain the counts

    我们看到上面的结果是一个key对应了一个list,而并非是真正的结果,所以这里还需要进一步操作,计算list的长度,或者求和都可以。

    # TODO: Replace <FILL IN> with appropriate code
    wordCountsGrouped = wordsGrouped.mapValues(lambda x : sum(list(x)))
    print wordCountsGrouped.collect()
    

    这里我用了一个新的方法mapValues(),这个方法是直接传入value进行计算,不改变key的值。

    Counting using reduceByKey

    reduceByKey()相比于groupByKey()就有效得多。因为它是在本地计算好了后,进行reduce操作,类似与MapReduce里的combiner。其实这里还可以这么写

    wordCounts = wordPairs.reduceByKey(add)
    

    但是这么不让from operator import add,所以写成lambda表达式。

    All together

    把前面所有的知识点写成一句。。。

    # TODO: Replace <FILL IN> with appropriate code
    wordCountsCollected = (wordsRDD
                           .map(lambda x: (x,1))
                           .reduceByKey(lambda a,b: a+b)
                           .collect())
    print wordCountsCollected
    

    Part 3 Finding unique words and a mean value

    Unique words

    这里是计算RDD里不同单词的数目。我们在前面已经reduce过了,实际上把相同的合并了,这里只需要计算法RDD里的元素个数就行

    # TODO: Replace <FILL IN> with appropriate code
    uniqueWords = wordCounts.count()
    print uniqueWords
    

    Mean using reduce

    这里是要把所有的value加起来,然后除以元素的个数,也就是平均每个元素有多少次。用到了一个新的函数values(),这个方法直接返回所有value的list。

    # TODO: Replace <FILL IN> with appropriate code
    from operator import add
    totalCount = (wordCounts
                  .values()
                  .reduce(add))
    average = totalCount / float(wordCounts.count())
    print totalCount
    print round(average, 2)
    

    Part 4 Apply word count to a file

    这里是综合了前面所有的知识,来对一个文本文件进行处理,还涉及到文本文件处理的一些方法。

    wordCount function

    # TODO: Replace <FILL IN> with appropriate code
    def wordCount(wordListRDD):
        """Creates a pair RDD with word counts from an RDD of words.
    
        Args:
            wordListRDD (RDD of str): An RDD consisting of words.
    
        Returns:
            RDD of (str, int): An RDD consisting of (word, count) tuples.
        """
        return wordListRDD.map(lambda x: [x,1]).reduceByKey(lambda a,b: a+b)
    print wordCount(wordsRDD).collect()
    

    Capitalization and punctuation

    真实的文本文件远比我们之前碰到的例子复杂。我们需要考虑三个问题:

    1. 单词里的大小写,例如Spark和spark应该是同一个单词
    2. 标点符号要去掉
    3. 单词前后的空格也要去掉
    # TODO: Replace <FILL IN> with appropriate code
    import re
    def removePunctuation(text):
        """Removes punctuation, changes to lower case, and strips leading and trailing spaces.
    
        Note:
            Only spaces, letters, and numbers should be retained.  Other characters should should be
            eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
            punctuation is removed.
    
        Args:
            text (str): A string.
    
        Returns:
            str: The cleaned up string.
        """
        return re.sub(r'[^ws]','',text).strip().lower()
    print removePunctuation('Hi, you!')
    print removePunctuation(' No under_score!')
    

    这里re.sub()里面的正则表达式有点复杂。大概意思是把所有的标点都换成'',这里的r表示原生字符串,也就是不进行转义,^表示否,w和s分别表示单词和空格。所以结果是只保留单词和空格。

    Load a text file

    这里用到了Complete Works of William Shakespeare,来自Project Gutenberg,作为文本。
    具体文件从哪里来,我们不用管,运行就行。

    # Just run this code
    import os.path
    baseDir = os.path.join('data')
    inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
    fileName = os.path.join(baseDir, inputPath)
    
    shakespeareRDD = (sc
                      .textFile(fileName, 8)
                      .map(removePunctuation))
    print '
    '.join(shakespeareRDD
                    .zipWithIndex()  # to (line, lineNum)
                    .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                    .take(15))
    

    Words from lines

    我们这里要把行转为单词。上面的RDD中,每个元素都是一行。假如我们直接用split方法,最后是生成一个list,我们还需要把list解开,所以这里用flatMap()

    # TODO: Replace <FILL IN> with appropriate code
    shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x : x.split(' '))
    shakespeareWordCount = shakespeareWordsRDD.count()
    print shakespeareWordsRDD.top(5)
    print shakespeareWordCount
    

    Remove empty elements

    在上面操作的基础上,去掉空的单词。

    # TODO: Replace <FILL IN> with appropriate code
    shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x !='')
    shakeWordCount = shakeWordsRDD.count()
    print shakeWordCount
    

    Count the words

    这个就和之前的工作一样了。不多说了。

    # TODO: Replace <FILL IN> with appropriate code
    top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15,key=lambda(w,c):-c)
    print '
    '.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
    
  • 相关阅读:
    GDB Practice
    GCC常用命令
    使用VS2010 C#编写ActiveX控件
    [.NET] 使用 .NET Framework 開發 ActiveX Control
    VC2005开发MFC ActiveX控件
    Register DLL and OCX
    COM组件开发实践
    Java Invoke C and C++ Using JNI
    Unable to cast object of type 'System.Int32' to type 'System.String'.
    SharePoint wiki 分类获取所有的
  • 原文地址:https://www.cnblogs.com/-Sai-/p/6662312.html
Copyright © 2011-2022 走看看