zoukankan      html  css  js  c++  java
  • spark 系列之一 RDD的使用


    spark中常用的两种数据类型,一个是RDD,一个是DataFrame,本篇主要介绍RDD的一些应用场景见代码
    本代码的应用场景是在spark本地调试(windows环境)
    /**
     * 创建 sparkSession对象
     */
    val sparkSession = SparkSession.builder()
                      .appName("TextFile")
                      .master("local")
                      .getOrCreate()

    word.txt 的文本内容如下

    wordcount:三个算子搞定 flatMap 是把数据打平,map是对打平的数据每个计数一,reduceBykey是按照key进行分类汇总。

    /**
     * wordCount 程序,三个算子搞定
     */
    val peopleRDD1 = sparkSession.sparkContext
                                .textFile("file:///D:/software_download/spark_text/word.txt")
                                .flatMap(line=>line.split(" "))
                                .map(word=>(word,1))
                                .reduceByKey((a,b)=>a+b)
    
    peopleRDD1.foreach(println)


    Result:
          

      (scala,1)
      (faster,1)
      (is,1)
      (spark,2)
      (hadoop,1)
      (love,6)
      (i,6)
      (python,1)
      (nodejs,1)
      (java,1)

    
    

    按照key进行分组

     /**
       * 分组
     */
     val peopleRDD2 = sparkSession.sparkContext
                                    .textFile("file:///D:/software_download/spark_text/word.txt")
                                    .flatMap(line=>line.split(" "))
                                    .map(word=>(word,1))
                                    .groupByKey()
     peopleRDD2.foreach(println)

    Result:

    (scala,CompactBuffer(1))
    (spark,CompactBuffer(1, 1))
    (is,CompactBuffer(1))
    (faster,CompactBuffer(1))
    (hadoop,CompactBuffer(1))
    (love,CompactBuffer(1, 1, 1, 1, 1, 1))
    (i,CompactBuffer(1, 1, 1, 1, 1, 1))
    (python,CompactBuffer(1))
    (nodejs,CompactBuffer(1))
    (java,CompactBuffer(1))

    遍历RDD的keys和values,RDD中存放的是一个个对象,这点跟DataFrame不同,RDD中的对象对外的表现是黑盒的,即你不知道RDD中具体的字段是什么。DataFrame则不同,你可以清晰的看到DataFrame中所存放对象的内部结构。

    /**
      * RDD keys与values的遍历
      */
    
    peopleRDD1.keys.foreach(println)
    peopleRDD1.values.foreach(println)

    Result:

    scala
    faster
    is
    spark
    hadoop
    love
    i
    python
    nodejs
    java
    1
    1
    1
    2
    1
    6
    6
    1
    1
    1

    
    

    RDD 的其它操作,

      /**
    * 只针对value的值进行操作,以下两种操作等效,都是对key值加1操作
    */
    peopleRDD1.sortByKey().map(x=>(x._1,x._2+1)).foreach(println)
    peopleRDD1.sortByKey().mapValues(x=>x+1).foreach(println)

    //按照value值进行排序
    peopleRDD1.sortBy(x=>x._2,ascending = true).foreach(println)
    //按照key值进行排序
    peopleRDD1.sortByKey(ascending = true).foreach(println)

    /**
    * RDD之间的join操作
    */
    val pairRDD1 = sparkSession.sparkContext.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
    val pairRDD2 = sparkSession.sparkContext.parallelize(Array(("spark","fast")))
    val RDD1_join_RDD2 = pairRDD1.join(pairRDD2)
    RDD1_join_RDD2.foreach(println)

    思考题: 求该rdd,按照key进行分组后,value值得平均值,答案如下。

    求:写代码

    val rdd = sparkSession.sparkContext.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))

    Result:

    (spark,4)
    (hadoop,5)

     答案见系列之二。

  • 相关阅读:
    【Tomcat】export: `xxx': 不是有效的标识符
    【Linux】查看程序是否正常运行
    【Linux】bat文件如何执行
    【oracle】截取字符串
    【java异常】Expected one result (or null) to be returned by selectOne(), but found: 63
    【java异常】定时任务异常ERROR 20604 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
    【oracle】ORA-12638
    【java异常】Unable to install breakpoint in
    【oracle】处理锁表
    20180318 代码错题(4)
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14188220.html
Copyright © 2011-2022 走看看