Spark Sql //************** Spark SQL ******************* 1、Spark SQL 是Spark套件中的一个模块,他将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。 2、Spark SQL的特点: 1、和Spark Core的无缝集成,我可以在写整个RDD应用的时候,配置Spark SQL来实现我的逻辑。 2、统一的数据访问方式,Spark SQL提供标准化的SQL查询。 3、Hive的继承,Spark SQL通过内嵌Hive或者连接外部已经部署好的hive实例,实现了对Hive语法的继承和操作。 4、标准化的连接方式,Spark SQL可以通过启动thrift Server来支持JDBC、ODBC的访问,将自己作为一个BI Server使用。 //************* Spark SQL 数据抽象 *********** 0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6) 1、Spark SQL提供了DataFrame和DataSet的数据抽象。 2、DataFrame就是RDD + Schema,可以认为是一张二维表格。他的劣势是在编译器不进行表格中的字段的类型检查。在运行期进行检查。 3、DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。 4、DataFrame = DataSet[Row] 5、DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。 //************** Spark SQL 客户端查询 ********** 1、你可以通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名 2、可以通过Spark提供的方法读取JSOn文件,将JSON文件转换成DataFrame 3、可以通过DataFrame提供的API来操作DataFrame里面的数据。 4、你可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。 //************** DataFrame 查询方式 1、DataFrame支持两种查询方式一种是DSL风格,另外一种是SQL风格 1、DSL风格: 1、你需要引入 import spark.implicit._ 这个隐式转换,可以将DataFrame隐式转换成RDD。 2、SQL风格: 1、你需要将DataFrame注册成一张表格,如果你通过CreateTempView这种方式来创建,那么该表格Session有效,如果你通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀 global_temp 2、你需要通过sparkSession.sql 方法来运行你的SQL语句。 //************** DataSet 1、首先定义一个DataSet,你需要先定义一个Case类。 //************** RDD、DataSet、DataFrame之间的转换总结 ************* 1、 RDD -> DataFrame : rdd.map(para=>(para(0).trim(),para(1).trim().toInt)).toDF("name","age") //通过反射来设置 rdd.map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF() //通过编程方式来设置Schema,适合于编译期不能确定列的情况 schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val rdd[Row] = rdd.map(attributes => Row(attributes(0), attributes(1).trim)) val peopeDF = spark.createDataFrame(rdd[Row],schema) 2、 DataFrame -> RDD : dataFrame.rdd 注意输出:Array([Michael,29], [Andy,30], [Justin,19]) 1、 RDD -> DataSet : rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS 2、 DataSet -> RDD : dataSet.rdd 注意输出: Array(Person(Michael,29), Person(Andy,30), Person(Justin,19)) 1、 DataFrame -> DataSet: dataFrame.to[Person] 2、 DataSet -> DataFrame: dataSet.toDF //************** 对于DataFrame Row对象的访问方式 1、DataFrame = DataSet[Row], DataFrame里面每一行都是Row对象 2、如果需要访问Row对象中的每一个元素,你可以通过下标 row(0);你也可以通过列名 row.getAs[String]("name") //************** 应用UDF函数 *************** 1、通过spark.udf.register(name,func)来注册一个UDF函数,name是UDF调用时的标识符,fun是一个函数,用于处理字段。 2、你需要将一个DF或者DS注册为一个临时表。 3、通过spark.sql去运行一个SQL语句,在SQL语句中可以通过 name(列名) 方式来应用UDF函数。 //************* UDAF 用户自定义聚合函数 1、弱类型用户自定义聚合函数 1、新建一个Class 继承UserDefinedAggregateFunction ,然后复写方法: //聚合函数需要输入参数的数据类型 override def inputSchema: StructType = ??? //可以理解为保存聚合函数业务逻辑数据的一个数据结构 override def bufferSchema: StructType = ??? // 返回值的数据类型 override def dataType: DataType = ??? // 对于相同的输入一直有相同的输出 override def deterministic: Boolean = true //用于初始化你的数据结构 override def initialize(buffer: MutableAggregationBuffer): Unit = ??? //用于同分区内Row对聚合函数的更新操作 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ??? //用于不同分区对聚合结果的聚合。 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ??? //计算最终结果 override def evaluate(buffer: Row): Any = ??? 2、你需要通过spark.udf.resigter去注册你的UDAF函数。 3、需要通过spark.sql去运行你的SQL语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。 2、强类型的用户自定义聚合函数 1、新建一个class,继承Aggregator[Employee, Average, Double],其中Employee是在应用聚合函数的时候传入的对象,Average是聚合函数在运行的时候内部需要的数据结构,Double是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。复写想对应的方法: //用于定义一个聚合函数内部需要的数据结构 override def zero: Average = ??? //针对每个分区内部每一个输入来更新你的数据结构 override def reduce(b: Average, a: Employee): Average = ??? //用于对于不同分区的结构进行聚合 override def merge(b1: Average, b2: Average): Average = ??? //计算输出 override def finish(reduction: Average): Double = ??? //用于数据结构他的转换 override def bufferEncoder: Encoder[Average] = ??? //用于最终结果的转换 override def outputEncoder: Encoder[Double] = ??? 2、新建一个UDAF实例,通过DF或者DS的DSL风格语法去应用。