zoukankan      html  css  js  c++  java
  • spark-------------8种实现wordCount方法

    引言

    通过学习RDD,并了解和掌握RDD的转换算子和行动算子。现在对所有能实现wordCount的功能的算子总结一下。

    正文

    用了8个方法来实现wordCount。通过对比,发现有些方法类似。运行结果读者自行验证

    代码

    package com.xiao.spark.core.wc
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object AllWoldCount {
      def main(args: Array[String]): Unit = {
    
    
    
          // 建立和spark框架的连接
          val conf = new SparkConf().setMaster("local").setAppName("WordCount");
          val sc = new SparkContext(conf);
          wordCount8(sc)
          sc.stop();
    
      }
        //  groupBy
        def wordCount1(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将所有相同的单词放到一个元组里
            val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
            val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
            println(wordCount.collect().mkString(","))
        }
    
        // groupByKey
        def wordCount2(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map(word => (word,1))  //ords.map((_,1))
            // (scala,CompactBuffer(1)),(spark,CompactBuffer(1)),(hello,CompactBuffer(1, 1))
            val group: RDD[(String, Iterable[Int])] = wordToOne.groupByKey()
            val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
            println(wordCount.collect().mkString(","))
        }
    
        // reduceByKey
        def wordCount3(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
            println(wordCount.collect().mkString(","))
        }
    
        // aggregateByKey
        def wordCount4(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            // 初始值 分区内的操作 分区间操作
            val wordCount: RDD[(String, Int)] = wordToOne.aggregateByKey(0)(_+_,_+_)
            println(wordCount.collect().mkString(","))
        }
    
        // foldByKey
        def wordCount5(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            // 初始值 分区内和分区间操作相同
            val wordCount: RDD[(String, Int)] = wordToOne.foldByKey(0)(_+_)
            println(wordCount.collect().mkString(","))
        }
    
        // combineByKey
        def wordCount6(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            // 初始值 分区内和分区间操作相同
            val wordCount: RDD[(String, Int)] = wordToOne.combineByKey(
                v => v,     // 初始值的操作
                (x : Int,y :Int) => x+y,  // 分区内的操作
                (x : Int,y :Int) => x+y,  // 分区间的操作
            )
            println(wordCount.collect().mkString(","))
        }
    
        // countByKey
        def wordCount7(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            // 将每个单词变成 word => (word,1)
            val wordToOne: RDD[(String, Int)] = words.map((_,1))
            val wordCount: collection.Map[String, Long] = wordToOne.countByKey()
            println(wordCount)
        }
    
        // countByValue
        def wordCount8(sc : SparkContext): Unit ={
            val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
            // 扁平化操作:拆分所有句子,把所有单词放到一个list里
            val words: RDD[String] = rdd.flatMap(_.split(" "))
            val wordCount: collection.Map[String, Long] = words.countByValue()
            println(wordCount)
        }
    }
    
  • 相关阅读:
    [转]汇编语言的准备知识给初次接触汇编者 4
    Javascript实现页面跳转的几种方式收藏
    [转]汇编语言的准备知识给初次接触汇编者 1
    jQuery常用的函数的简单描述 便于查阅
    解决win7光驱驱动找不到的问题
    tar
    liunx64运行飞信的问题
    centos6禁用ipv6
    仍然是yum问题rhel6使用centos的yum源
    【MyBatis】使用MyBatis的分页组件PageHelper时,多表关联下使用别名查询时,前台传参过来,根据参数排序的解决方案
  • 原文地址:https://www.cnblogs.com/yangxiao-/p/14347390.html
Copyright © 2011-2022 走看看