zoukankan      html  css  js  c++  java
  • 使用Spark-Core算子写的WordCount的11种解决方案

    通过Spark-Core API写的WordCount的11种解决方案:

    package com.fym.spark.core.wc
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import java.awt.image.IndexColorModel
    import scala.collection.mutable
    
    object Spark03_WordCount {
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(sparkConf)
    
        wordCount11(sc)
    
        sc.stop()
      }
    
      //groupBy
      def wordCount1(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val group:RDD[(String,Iterable[String])] = words.groupBy(word => word)
        val wordCount:RDD[(String,Int)] = group.mapValues(iter => iter.size)
    
      }
    
      //groupByKey,含有shuffle的过程,如果数据量较大,那么性能会受到影响
      def wordCount2(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val group:RDD[(String,Iterable[Int])] = wordOne.groupByKey()
        val wordCount:RDD[(String,Int)] = group.mapValues(iter => iter.size)
    
      }
    
      //reduceByKey,含有预聚合的功能,会减少shuffle时落盘的数据量
      def wordCount3(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount:RDD[(String,Int)] = wordOne.reduceByKey(_ + _)
    
    
      }
    
      //aggregateByKey
      def wordCount4(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount:RDD[(String,Int)] = wordOne.aggregateByKey(0)(_+_,_+_)
    
      }
    
      //foldByKey
      def wordCount5(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount:RDD[(String,Int)] = wordOne.foldByKey(0)(_+_)
    
      }
    
      //combineByKey
      def wordCount6(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount:RDD[(String,Int)] = wordOne.combineByKey(
          v=>v,
          //第一个值的转换过程需要动态识别,所以需要在后面的两个参数加上类型
          (x:Int,y:Int) => x+y,
          (x:Int,y:Int) => x+y
        )
    
      }
      //countByKey
      def wordCount7(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordcount:scala.collection.Map[String,Long] = wordOne.countByKey()
    
      }
      //countByValue
      def wordCount8(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
    
        val wordcount:scala.collection.Map[String,Long] = words.countByValue()
    
      }
    
      //reduce
      def wordCount9(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
    
        // 期望的格式 [(word1,sum1),(word2,sum2),...]
        // string => (string,1)
        val mapWord = words.map(
          word => {
            mutable.Map[String, Long]((word, 1))
          }
        )
    
        val wordcount = mapWord.reduce(
          (map1, map2) => {
            map2.foreach {
              case (word, count) => {
                val newcount = map1.getOrElse(word, 0l) + count
                map1.update(word, newcount)
              }
            }
            map1
          }
        )
    
        println(wordcount)
      }
      //aggregate
      def wordCount10(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
    
        // 期望的格式 [(word1,sum1),(word2,sum2),...]
        // string => (string,1)
        val mapWord = words.map(
          word => {
            mutable.Map[String, Long]((word, 1))
          }
        )
    
        val wordcount = mapWord.aggregate(mutable.Map[String, Long]())(
          (map1, map2) => {
            map2.foreach {
              case (word, count) => {
                val newcount = map1.getOrElse(word, 0l) + count
                map1.update(word, newcount)
              }
            }
            map1
          },
          (map1, map2) => {
            map2.foreach {
              case (word, count) => {
                val newcount = map1.getOrElse(word, 0l) + count
                map1.update(word, newcount)
              }
            }
            map1
          }
        )
    
        println(wordcount)
      }
      //fold
      def wordCount11(sc:SparkContext): Unit ={
    
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    
        val words = rdd.flatMap(_.split(" "))
    
        // 期望的格式 [(word1,sum1),(word2,sum2),...]
        // string => (string,1)
        val mapWord = words.map(
          word => {
            mutable.Map[String, Long]((word, 1))
          }
        )
    
        val wordcount = mapWord.fold(mutable.Map[String, Long]())(
          (map1, map2) => {
            map2.foreach {
              case (word, count) => {
                val newcount = map1.getOrElse(word, 0l) + count
                map1.update(word, newcount)
              }
            }
            map1
          }
        )
    
        println(wordcount)
      }
    }
    
  • 相关阅读:
    BZOJ 2199 [Usaco2011 Jan]奶牛议会
    BZOJ 2621 [Usaco2012 Mar]Cows in a Skyscraper
    BZOJ 2272 [Usaco2011 Feb]Cowlphabet
    BZOJ 2580 [Usaco2012 Jan]Video Game
    BZOJ 2099 [Usaco2010 Dec]Letter 恐吓信
    maxcontent css 采用内部元素宽度值最大的那个元素
    JSON.parse()
    uniapp去除顶部标题样式
    logminer的使用
    tmpfs文件系统
  • 原文地址:https://www.cnblogs.com/yxym2016/p/14196874.html
Copyright © 2011-2022 走看看