zoukankan      html  css  js  c++  java
  • sparksql笔记

    1、sparksql是Spark用来处理结构化数据的一个模块,它提供了两个抽象DataFrame和DataSet并且作为分布式SQL查询引擎的作用。

    Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快

    2、DataFrame,是数据结构,类似数据库表,Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错。

    与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。
    同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
    由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验
    DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
    DataFrame也是懒执行的。
    性能上比RDD要高: 定制化内存管理,数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制

    3、DataSet,它是DataFrame的扩展,既具有类型安全检查也具有Dataframe的查询优化特性。

     1)是Dataframe API的一个扩展,是Spark最新的数据抽象
     2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
     3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
     4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
     5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。
         Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
     6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
     7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,
        比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,
        所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。

    参考demo:https://github.com/asker124143222/spark-demo

    示例:

    package com.home.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    object Ex_sparkSql {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark session")
    
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    
        val df: DataFrame = spark.read.json("input/userinfo.json")
    
        // df.show()
    
        df.createOrReplaceTempView("userinfo")
    
        spark.sql("select * from userinfo where age=30").show()
    
    
        //通过sparkSession生成rdd
        val rdd: RDD[(String, String)] = spark.sparkContext.textFile("input/1.txt").map(line => {
          val s = line.split(" ")
          (s(0), s(1))
        })
    //    rdd.collect().foreach(println)
    
        //rdd转换成df或者ds需要SparkSession实例的隐式转换
        //导入隐式转换,注意这里的spark不是包名,而是SparkSession的对象名
        import spark.implicits._
    
        //rdd转成DataFrame
        val frame: DataFrame = rdd.toDF("name","value")
    
        //DataFrame转成DataSet
        val ds: Dataset[MyClass] = frame.as[MyClass]
    
        //ds转成df
        val df2: DataFrame = ds.toDF()
    
        //df转成rdd
        val rdd2 : RDD[Row]= df2.rdd
    
        //打印
        rdd2.foreach(row=>{
          println(row.getString(0)+"  -- "+row.getString(1))
        })
    
        val myRDD: RDD[MyClass] = rdd.map {
          case (name, value) => {
            MyClass(name, value)
          }
        }
        val myDS = myRDD.toDS()
    
        println("---"*10)
        myDS.show()
    
        spark.stop()
      }
    }
    
    case class MyClass(name:String,value:String)
  • 相关阅读:
    Linux Sever简单笔记(第十二堂课)之linux下的系统故障分析和排查
    Linux Sever简单笔记(第十一堂课)之linux下的备份和恢复及rsync还有inotify和dump以及restore
    Linux Sever简单笔记(第十堂课)之linux下的任务计划及相关的命令
    ubuntu18.04设置apt源(国内)
    shell简单常用脚本实例
    装完ubuntu系统之后,不能ssh正常连接
    mysql主从复制以及读写分离
    复习计划
    linux下dhcp的安装及配置
    日常问题
  • 原文地址:https://www.cnblogs.com/asker009/p/12092641.html
Copyright © 2011-2022 走看看