zoukankan      html  css  js  c++  java
  • spark实现wordcount的几种方式总结

    方法一:map + reduceByKey

    package com.cw.bigdata.spark.wordcount
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * WordCount实现第一种方式:map + reduceByKey
      *
      * @author 陈小哥cw
      * @date 2020/7/9 9:59
      */
    object WordCount1 {
      def main(args: Array[String]): Unit = {
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")
    
        val sc: SparkContext = new SparkContext(config)
    
        val lines: RDD[String] = sc.textFile("in")
    
        lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
      }
    }
    
    

    方法二:使用countByValue代替map + reduceByKey

    package com.cw.bigdata.spark.wordcount
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * WordCount实现第二种方式:使用countByValue代替map + reduceByKey
      *
      * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。(不必作用在kv格式上)
      * map(value => (value, null)).countByKey()
      *
      * @author 陈小哥cw
      * @date 2020/7/9 10:02
      */
    object WordCount2 {
      def main(args: Array[String]): Unit = {
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")
    
        val sc: SparkContext = new SparkContext(config)
    
        val lines: RDD[String] = sc.textFile("in")
    
        lines.flatMap(_.split(" ")).countByValue().foreach(println)
    
      }
    }
    
    

    方法三:aggregateByKey或者foldByKey

    package com.cw.bigdata.spark.wordcount
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    /**
      * WordCount实现第三种方式:aggregateByKey或者foldByKey
      *
      * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
      *   1.zeroValue:给每一个分区中的每一个key一个初始值;
      *   2.seqOp:函数用于在每一个分区中用初始值逐步迭代value;(分区内聚合函数)
      *   3.combOp:函数用于合并每个分区中的结果。(分区间聚合函数)
      *
      *  foldByKey相当于aggregateByKey的简化操作,seqop和combop相同
      *
      *
      * @author 陈小哥cw
      * @date 2020/7/9 10:08
      */
    object WordCount3 {
      def main(args: Array[String]): Unit = {
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")
    
        val sc: SparkContext = new SparkContext(config)
    
        val lines: RDD[String] = sc.textFile("in")
    
        lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
        
        lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)
    
      }
    }
    
    

    方法四:groupByKey+map

    package com.cw.bigdata.spark.wordcount
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    /**
      * WordCount实现的第四种方式:groupByKey+map
      *
      * @author 陈小哥cw
      * @date 2020/7/9 13:32
      */
    object WordCount4 {
      def main(args: Array[String]): Unit = {
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")
    
        val sc: SparkContext = new SparkContext(config)
    
        val lines: RDD[String] = sc.textFile("in")
    
        val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()
    
        groupByKeyRDD.map(tuple => {
          (tuple._1, tuple._2.sum)
        }).collect().foreach(println)
    
      }
    }
    
    

    方法五:Scala原生实现wordcount

    package com.cw.bigdata.spark.wordcount
    
    
    /**
      * Scala原生实现wordcount
      *
      * @author 陈小哥cw
      * @date 2020/7/9 14:22
      */
    object WordCount5 {
      def main(args: Array[String]): Unit = {
    
        val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")
        /**
          * 第一步,将list中的元素按照分隔符这里是空格拆分,然后展开
          * 先map(_.split(" "))将每一个元素按照空格拆分
          * 然后flatten展开
          * flatmap即为上面两个步骤的整合
          */
    
    
        val res0 = list.map(_.split(" ")).flatten
        val res1 = list.flatMap(_.split(" "))
    
        println("第一步结果")
        println(res0)
        println(res1)
    
        /**
          * 第二步是将拆分后得到的每个单词生成一个元组
          * k是单词名称,v任意字符即可这里是1
          */
        val res3 = res1.map((_, 1))
        println("第二步结果")
        println(res3)
        /**
          * 第三步是根据相同的key合并
          */
        val res4 = res3.groupBy(_._1)
        println("第三步结果")
        println(res4)
        /**
          * 最后一步是求出groupBy后的每个key对应的value的size大小,即单词出现的个数
          */
        val res5 = res4.mapValues(_.size)
        println("最后一步结果")
        println(res5.toBuffer)
      }
    }
    
    

    方法六:combineByKey

    package com.cw.bigdata.spark.wordcount
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    /**
      * WordCount实现的第六种方式:combineByKey
      *
      * @author 陈小哥cw
      * @date 2020/7/9 22:55
      */
    object WordCount6 {
      def main(args: Array[String]): Unit = {
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")
    
        val sc: SparkContext = new SparkContext(config)
    
        val lines: RDD[String] = sc.textFile("in")
    
        val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))
    
        // combineByKey实现wordcount
        mapRDD.combineByKey(
          x => x,
          (x: Int, y: Int) => x + y,
          (x: Int, y: Int) => x + y
        ).collect().foreach(println)
    
      }
    }
    
    
  • 相关阅读:
    Python获取 东方财富 7x24小时全球快讯
    Elasticsearch 环境配置
    可执行jar包与依赖jar包
    IDEA注释模板
    CKEditor
    解决让浏览器兼容ES6特性
    asp.net一个非常简单的分页
    Asp.Net真分页技术
    jsp选项卡导航实现——模板
    nodejs类比Java中:JVM
  • 原文地址:https://www.cnblogs.com/chenxiaoge/p/13335409.html
Copyright © 2011-2022 走看看