zoukankan      html  css  js  c++  java
  • SparkSQL

    一、RDD、DataFrame和DataSet

    1,共性

    1、RDD、DataFrame和DateSet全都是spark平台下的弹性分布式数据集
    2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历
    3、三者都有partition的概念,有共同的函数如filter、排序等
    4、在对DataFrame和DataSet进行许多操作都需要进行支持import spark.implicits._

    2,区别

    1、RDD不支持sparksql,而DataFrame和DataSet则支持sparkSql可创建临时表/视图
    2、DataFrame和DataSet拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame也可以认为是Dataset[Row]

    二、自定义函数

    1,用户自定义UDF

    spark.udf.register("addName", (x:String)=> "Name:"+x) //spark(sparksession)
    spark.sql("Select addName(name), age from people").show()

    2,用户自定义UDAF

      a)弱类型

    **
      * 自定义聚合函数
      */
    class MyUDAF extends UserDefinedAggregateFunction{
      //输入参数的数据类型
      override def inputSchema: StructType = new StructType().add("age",LongType)
      //缓冲区数据类型
      override def bufferSchema: StructType = {
        new StructType().add("sum",LongType).add("count",LongType)
      }
      //输出数据类型
      override def dataType: DataType = DoubleType
      //对于相同的输入是否一直返回相同的输出。
      override def deterministic: Boolean = true
      //初始化
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
      //更新
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getLong(0) + input.getLong(0) //input对应输入值
        buffer(1) = buffer.getLong(1) + 1
      }
      //不同的execute间的数据合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
      //计算最终输出
      override def evaluate(buffer: Row): Any = {
        buffer.getLong(0).toDouble / buffer.getLong(1)
      }
    }
    //自定义聚合函数的使用
    val udaf = new MyUDAF spark.udf.register("ageAvg",udaf) //注册自定义聚合函数 spark.sql("select avg(age) from user").show()

      b)强类型

    case class User(name:String,age:Long)
    case class Average(var sum:Long,var count :Long)
    
    /**
      * -IN, BUF, OUT
      * 输入为User   缓存为Average   输出为Double平均值
      */
    object MyAVG extends Aggregator[User,Average,Double] {
      //初始化
      override def zero: Average = Average(0L,0L)
      //化简
      override def reduce(b: Average, a: User): Average = {
        b.sum += a.age
        b.count += 1
        b
      }
      //Executor之间合并
      override def merge(b1: Average, b2: Average): Average = {
        b1.sum += b2.sum
        b1.count += b2.count
        b1
      }
      //最终输出
      override def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
    
      override def bufferEncoder: Encoder[Average] = Encoders.product //自定义数据类型
    
      override def outputEncoder: Encoder[Double] = Encoders.scalaDouble //基本数据类型
    }
    //使用案例
    val myavg = MyAVG.toColumn.name("myavg")
    import spark.implicits._
    df.as[User].select(myavg).show(

    三、Spark的数据源

    文件保存选项SaveMode

    SaveMode.Append          //追加
    SaveMode.Overwrite       //覆写
    SaveMode.ErrorIfExists   //默认,如果存在报错
    SaveMode.Ignore          //数据存在,则忽略

    1,json、parquet

    //通用方式
    spark.read.format("json").load("path")
    spark.read.format("parquet").load("path")
    //json方式
    spark.read.json("path")
    //parquet
    df.write.parquet("path")

    2,JDBC

    //读取方式一
    val jdbcDF = spark.read
        .format("jdbc")
        .option("url", "jdbc:mysql://linux1:3306/rdd")
        .option("dbtable", "tableName")
        .option("user", "root")
        .option("password", "000000")
        .load()
    //读取方式二
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "000000")
    val jdbcDF2 =spark.read.jdbc("jdbc:mysql://linux1:3306/rdd","tableName", connectionProperties)
    //写入方式一
    jdbcDF.write
        .format("jdbc")
        .option("url", "jdbc:mysql://linux1:3306/rdd")
        .option("dbtable", "tableName")
        .option("user", "root")
        .option("password", "000000")
        .save()
    //写入方式二
    jdbcDF2.write.jdbc("jdbc:mysql://linux1:3306/rdd", "db", connectionProperties)

    3,Hive

      如果是引用外部的hive,只需要将hive-site.xml,拷贝至Spark的conf目录下(spark与hive交互)或是拷贝至resource目录下。

  • 相关阅读:
    关于String和StringBuilder、StringBuffer的一个简单性能测试
    HTML网页BODY中如何设置背景图拉伸的最有效方法
    JS鼠标事件大全
    去除链接虚线框的推荐方法
    CSS实现文字颠倒旋转效果
    三种方法解决IE6下png透明失效的问题
    js获取节点 dom操作
    IE HACK
    javascript作用域(Scope)
    RGB配色表
  • 原文地址:https://www.cnblogs.com/bbgs-xc/p/13361834.html
Copyright © 2011-2022 走看看