zoukankan      html  css  js  c++  java
  • spark-------------8种实现wordCount方法

    引言

    通过学习RDD,并了解和掌握RDD的转换算子和行动算子。现在对所有能实现wordCount的功能的算子总结一下。

    正文

    用了8个方法来实现wordCount。通过对比,发现有些方法类似。运行结果读者自行验证

    代码

    package com.xiao.spark.core.wc
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object AllWoldCount {
      def main(args: Array[String]): Unit = {
    
    
    
          // 建立和spark框架的连接
          val conf = new SparkConf().setMaster("local").setAppName("WordCount");
          val sc = new SparkContext(conf);
          wordCount8(sc)
          sc.stop();
    
      }
        //  groupBy
        def wordCount1(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将所有相同的单词放到一个元组里
            val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
            val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
            println(wordCount.collect().mkString(","))
        }
    
        // groupByKey
        def wordCount2(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map(word => (word,1))  //ords.map((_,1))
            // (scala,CompactBuffer(1)),(spark,CompactBuffer(1)),(hello,CompactBuffer(1, 1))
            val group: RDD[(String, Iterable[Int])] = wordToOne.groupByKey()
            val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
            println(wordCount.collect().mkString(","))
        }
    
        // reduceByKey
        def wordCount3(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
            println(wordCount.collect().mkString(","))
        }
    
        // aggregateByKey
        def wordCount4(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            // 初始值 分区内的操作 分区间操作
            val wordCount: RDD[(String, Int)] = wordToOne.aggregateByKey(0)(_+_,_+_)
            println(wordCount.collect().mkString(","))
        }
    
        // foldByKey
        def wordCount5(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            // 初始值 分区内和分区间操作相同
            val wordCount: RDD[(String, Int)] = wordToOne.foldByKey(0)(_+_)
            println(wordCount.collect().mkString(","))
        }
    
        // combineByKey
        def wordCount6(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            // 初始值 分区内和分区间操作相同
            val wordCount: RDD[(String, Int)] = wordToOne.combineByKey(
                v => v,     // 初始值的操作
                (x : Int,y :Int) => x+y,  // 分区内的操作
                (x : Int,y :Int) => x+y,  // 分区间的操作
            )
            println(wordCount.collect().mkString(","))
        }
    
        // countByKey
        def wordCount7(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            val wordCount: collection.Map[String, Long] = wordToOne.countByKey()
            println(wordCount)
        }
    
        // countByValue
        def wordCount8(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            val wordCount: collection.Map[String, Long] = words.countByValue()
            println(wordCount)
        }
    }
    
  • 相关阅读:
    Redis (error) NOAUTH Authentication required.问题
    C# File() 方法 contentType参数取值
    Java常用快捷键汇总(杂乱无章,持续更新)
    VUE项目发布至IIS(不涉及代理)
    js的休眠实现---sleep()
    Oracle 分页
    winform Panel设定 按钮显示位置
    Chart控件绘制实时折线图表——Spline
    .NET打包合并dll方法
    ComboBox绑定键值对
  • 原文地址:https://www.cnblogs.com/yangxiao-/p/14347390.html
Copyright © 2011-2022 走看看