zoukankan      html  css  js  c++  java
  • 006 Spark中的wordcount以及TopK的程序编写

    1.启动

      启动HDFS

      启动spark的local模式./spark-shell

    2.知识点

     textFile:

      def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String]

     Filter: 

      Return a new RDD containing only the elements that satisfy a predicate.

      def filter(f: T => Boolean): RDD[T],返回里面判断是true的RDD。

     map:

      Return a new RDD by applying a function to all elements of this RDD.
     def map[U: ClassTag](f: T => U): RDD[U],从T到U类型的一个数据转换函数,最终返回的RDD中的数据类型是f函数返回的数据类型

     flatMap:

        Return a new RDD by first applying a function to all elements of this
    RDD, and then flattening the results.
        def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
      从T到集合类型的数据类型转换,集合中的数据类型是U,最终返回的RDD数据类型是f函数返回的集合中的具体的类型数据。


    3.编写基础的wordcount程序
     1 //读取文件
     2 val rdd=sc.textFile("wc/input/wc.input")
     3 //过滤数据
     4 val filterRdd=rdd.filter(len=>len.length>0)
     5 //数据转换
     6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ")
     7     .map(word=>(word,1)))
     8 //分组
     9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1)
    10 //聚合
    11 val wordCount=groupByRdd.map(tuple=>{
    12     val word=tuple._1
    13     val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2)
    14     (word,sum)
    15 })
    16 //输出
    17 wordCount.foreach(println)             //控制台上的输出
    18 wordCount.saveAsTextFile("wc/output6") //HDFS上的输出

    4.简化代码(链式编程)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //分组
     7 groupByKey().
     8 //统计
     9 map(tuple=>(tuple._1,tuple._2.toList.sum)).
    10 //输出
    11 saveAsTextFile("wc/output7")

    5.最优化程序

       reduceByKey存在combiner。

      groupBy在大数据量的情况下,会出现OOM

    1 sc.textFile("wc/input/wc.input").
    2 //数据过滤
    3 filter(_.length>0).
    4 //数据转换
    5 flatMap(_.split(" ").map((_,1))).
    6 //统计
    7 reduceByKey(_+_).
    8 //输出
    9 saveAsTextFile("wc/output8")

    6.显示结果

    1 sc.textFile("wc/input/wc.input").
    2 //数据过滤
    3 filter(_.length>0).
    4 //数据转换
    5 flatMap(_.split(" ").map((_,1))).
    6 //统计
    7 reduceByKey(_+_).
    8 collect()

    7.排序(第二个数,从大到小)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //统计
     7 reduceByKey(_+_).
     8 //排序
     9 sortBy(tuple=>tuple._2,ascending=false).
    10 collect()

    8.TopK(方式一)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //统计
     7 reduceByKey(_+_).
     8 //排序
     9 sortBy(tuple=>tuple._2,ascending=false).
    10 take(4)

    9.TopK(方式二,自定义)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //统计
     7 reduceByKey(_+_).
     8 //排序
     9 sortBy(tuple=>tuple._2,ascending=false).
    10 top(3)(new scala.math.Ordering[(String,Int)](){
    11     override def compare(x:(String,Int),y:(String,Int))={
    12         val tmp=x._2.compare(y._2)
    13         if(tmp!=0) tmp
    14         else x._1.compare(x._1)
    15     }
    16     })
  • 相关阅读:
    kibana.yml(中文配置详解)
    Elasticsearch之marvel(集群管理、监控)插件安装之后的浏览详解
    ElasticSearch vs Lucene多维度分析对比
    ElasticSearch 应用场景
    ElasticSearch 在Hadoop生态圈的位置
    ElasticSearch 工作原理
    ElasticSearch 架构图
    ElasticSearch vs 关系型数据库
    Codeforces Round #311 (Div. 2)
    uva 568(数学)
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6371204.html
Copyright © 2011-2022 走看看