zoukankan      html  css  js  c++  java
  • Spark Week1 HomeWork

    package wikipedia
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.rdd.RDD
    import org.apache.log4j.{Level,Logger}
    
    
    case class WikipediaArticle(title: String, text: String) {
        /**
          * @return Whether the text of this article mentions `lang` or not
          * @param lang Language to look for (e.g. "Scala")
          */
        def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
    }
    
    object WikipediaRanking {
        //  设置日志
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        val langs = List(
            "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
            "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")
    
        val conf: SparkConf = new SparkConf()
        val sc: SparkContext = new SparkContext("local[*]", "Wikipedia")
    
        // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`
        val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse)
    
        /** Returns the number of articles on which the language `lang` occurs. 返回lang语言出现的文章篇数
          *  Hint1: consider using method `aggregate` on RDD[T].
          *  Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
          */
        def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int =
            rdd.filter(_.mentionsLanguage(lang)).count().toInt
    
        /* (1) Use `occurrencesOfLang` to compute the ranking of the languages
         *     (`val langs`) by determining the number of Wikipedia articles that
         *     mention each language at least once. Don't forget to sort the
         *     languages by their occurrence, in decreasing order!
         *
         *   Note: this operation is long-running. It can potentially run for
         *   several seconds.
         */
        def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
            rdd.cache()     // 允许数据存储在内存
            langs.map(lang => (lang, occurrencesOfLang(lang, rdd))).sortBy(_._2).reverse
            /*
            对于langs的每一个元素找到包含它的文章篇数。
            其中sortBy(_._2)指根据occurrencesOfLang(lang, rdd))来排序,
            如果是sortBy(_._1)则根据lang来排序
            默认从小到大排序,所以加上.reverse
            */
        }
    
    
        /* Compute an inverted index of the set of articles, mapping each language
         * to the Wikipedia pages in which it occurs.
         */
        def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
            val articles_Languages = rdd.flatMap(article => {
                langs.filter(lang => article.mentionsLanguage(lang))
                                            .map(lang => (lang, article))
            })
            articles_Languages.groupByKey
        }
    
        /* (2) Compute the language ranking again, but now using the inverted index. Can you notice
         *     a performance improvement?
         *
         *   Note: this operation is long-running. It can potentially run for
         *   several seconds.
         */
        def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
            index.mapValues(_.size).sortBy(-_._2).collect().toList
    
        /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
         *     Can you notice an improvement in performance compared to measuring *both* the computation of the index
         *     and the computation of the ranking? If so, can you think of a reason?
         *
         *   Note: this operation is long-running. It can potentially run for
         *   several seconds.
         */
        def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
            rdd.flatMap(article => {
                langs.filter(article.mentionsLanguage) // 相当于langs.filter(lang => article.mentionsLanguage(lang)) 或者 langs.filter(article.mentionsLanguage(_))
                    .map((_, 1))
            }).reduceByKey(_ + _)
                .sortBy(_._2)
                .collect()
                .toList
                .reverse
        }
    
        def main(args: Array[String]) {
    
            /* Languages ranked according to (1) */
            val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
    
            /* An inverted index mapping languages to wikipedia pages on which they appear */
            def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
    
            /* Languages ranked according to (2), using the inverted index */
            val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
    
            /* Languages ranked according to (3) */
            val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))
    
            /* Output the speed of each ranking */
            println(timing)
            sc.stop()
        }
    
        val timing = new StringBuffer
        def timed[T](label: String, code: => T): T = {
            val start = System.currentTimeMillis()
            val result = code
            val stop = System.currentTimeMillis()
            timing.append(s"Processing $label took ${stop - start} ms.
    ")
            result
        }
    }
    
    
  • 相关阅读:
    1
    前端必读书籍推荐
    cn
    网站爬虫优化
    es学习
    适应移动端
    chrome禁止缓存,每次都最新的
    vue 源码环境
    [Java] 设计模式之工厂系列 04 (自定义模拟 spring 读取xml文件 beanFactory)
    [Java] JDOM 读取 xml 文件 示例程序初步
  • 原文地址:https://www.cnblogs.com/shayue/p/11229745.html
Copyright © 2011-2022 走看看