zoukankan      html  css  js  c++  java
  • 006 Spark中的wordcount以及TopK的程序编写

    1.启动

      启动HDFS

      启动spark的local模式./spark-shell

    2.知识点

     textFile:

      def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String]

     Filter: 

      Return a new RDD containing only the elements that satisfy a predicate.

      def filter(f: T => Boolean): RDD[T],返回里面判断是true的RDD。

     map:

      Return a new RDD by applying a function to all elements of this RDD.
     def map[U: ClassTag](f: T => U): RDD[U],从T到U类型的一个数据转换函数,最终返回的RDD中的数据类型是f函数返回的数据类型

     flatMap:

        Return a new RDD by first applying a function to all elements of this
    RDD, and then flattening the results.
        def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
      从T到集合类型的数据类型转换,集合中的数据类型是U,最终返回的RDD数据类型是f函数返回的集合中的具体的类型数据。


    3.编写基础的wordcount程序
     1 //读取文件
     2 val rdd=sc.textFile("wc/input/wc.input")
     3 //过滤数据
     4 val filterRdd=rdd.filter(len=>len.length>0)
     5 //数据转换
     6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ")
     7     .map(word=>(word,1)))
     8 //分组
     9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1)
    10 //聚合
    11 val wordCount=groupByRdd.map(tuple=>{
    12     val word=tuple._1
    13     val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2)
    14     (word,sum)
    15 })
    16 //输出
    17 wordCount.foreach(println)             //控制台上的输出
    18 wordCount.saveAsTextFile("wc/output6") //HDFS上的输出

    4.简化代码(链式编程)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //分组
     7 groupByKey().
     8 //统计
     9 map(tuple=>(tuple._1,tuple._2.toList.sum)).
    10 //输出
    11 saveAsTextFile("wc/output7")

    5.最优化程序

       reduceByKey存在combiner。

      groupBy在大数据量的情况下,会出现OOM

    1 sc.textFile("wc/input/wc.input").
    2 //数据过滤
    3 filter(_.length>0).
    4 //数据转换
    5 flatMap(_.split(" ").map((_,1))).
    6 //统计
    7 reduceByKey(_+_).
    8 //输出
    9 saveAsTextFile("wc/output8")

    6.显示结果

    1 sc.textFile("wc/input/wc.input").
    2 //数据过滤
    3 filter(_.length>0).
    4 //数据转换
    5 flatMap(_.split(" ").map((_,1))).
    6 //统计
    7 reduceByKey(_+_).
    8 collect()

    7.排序(第二个数,从大到小)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //统计
     7 reduceByKey(_+_).
     8 //排序
     9 sortBy(tuple=>tuple._2,ascending=false).
    10 collect()

    8.TopK(方式一)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //统计
     7 reduceByKey(_+_).
     8 //排序
     9 sortBy(tuple=>tuple._2,ascending=false).
    10 take(4)

    9.TopK(方式二,自定义)

     1 sc.textFile("wc/input/wc.input").
     2 //数据过滤
     3 filter(_.length>0).
     4 //数据转换
     5 flatMap(_.split(" ").map((_,1))).
     6 //统计
     7 reduceByKey(_+_).
     8 //排序
     9 sortBy(tuple=>tuple._2,ascending=false).
    10 top(3)(new scala.math.Ordering[(String,Int)](){
    11     override def compare(x:(String,Int),y:(String,Int))={
    12         val tmp=x._2.compare(y._2)
    13         if(tmp!=0) tmp
    14         else x._1.compare(x._1)
    15     }
    16     })
  • 相关阅读:
    第三天 moyax
    mkfs.ext3 option
    write file to stroage trigger kernel warning
    download fomat install rootfs script
    custom usb-seriel udev relus for compatible usb-seriel devices using kermit
    Wifi Troughput Test using iperf
    learning uboot switch to standby system using button
    learning uboot support web http function in qca4531 cpu
    learngin uboot design parameter recovery mechanism
    learning uboot auto switch to stanbdy system in qca4531 cpu
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6371204.html
Copyright © 2011-2022 走看看