SparkSQL-2
1.前言
2、JDBC数据源
sparksql可以从mysql表中加载大量的数据,然后进行相应的统计分析查询,也可以把最后得到的结果数据写回到mysql表
2.1 通过sparksql加载mysql表中的数据
- 代码开发
package cn.doit.sparksql
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:利用sparksql加载mysql表中的数据
object DataFromMysql {
def main(args: Array[String]): Unit = {
//1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")
//2、创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//3、读取mysql表的数据
//3.1 指定mysql连接地址
val url="jdbc:mysql://node1:3306/spark?useSSL=false&characterEncoding=utf-8&serverTimezone=GMT%2B8"
//3.2 指定要加载的表名
val tableName="iplocation"
// 3.3 配置连接数据库的相关属性
val properties = new Properties()
//用户名
properties.setProperty("user","root")
//密码
properties.setProperty("password","123456")
val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
//打印schema信息
mysqlDF.printSchema()
//展示数据
mysqlDF.show()
//把dataFrame注册成表
mysqlDF.createTempView("iplocation")
spark.sql("select * from iplocation where total_count >1500").show()
spark.stop()
}
}
2.2 通过sparksql把分析到的结果数据保存到mysql表中
- 代码开发
package cn.doit.sparksql
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:利用sparksql进行分析统计之后的结果数据保存到mysql表中
object DataSaveMysql {
def main(args: Array[String]): Unit = {
//1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("DataSaveMysql").setMaster("local[2]")
//2、创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//3、读取mysql表的数据
//3.1 指定mysql连接地址
val url="jdbc:mysql://node1:3306/spark"
//3.2 指定要加载的表名
val tableName="iplocation"
// 3.3 配置连接数据库的相关属性
val properties = new Properties()
//用户名
properties.setProperty("user","root")
//密码
properties.setProperty("password","123456")
val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
//打印schema信息
//mysqlDF.printSchema()
//展示数据
//mysqlDF.show()
//把dataFrame注册成表
mysqlDF.createTempView("iplocation")
//把数据接受到之后,进行进行对应的分析
val result: DataFrame = spark.sql("select * from iplocation where total_count >1500")
//保存result结果数据到mysql表中
val destTable="result2"
//数据写入到mysql表中可以调用mode方法,这里可以指定数据写入的模式
//overwrite: 表示覆盖,如果表事先不存在,它先帮忙我们创建
//append: 表示追加,如果表事先不存在,它先帮忙我们创建
//ignore: 表示忽略 如果表事先存在,就不进行任何操作
//error :表示报错, 如果表事先存在就报错(默认选项)
result.write.mode("ignore").jdbc(url,destTable,properties)
spark.stop()
}
}
- 后期可以把程序改造一下,打成jar包提交到集群中运行
package cn.doit.sparksql
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:利用sparksql进行分析统计之后的结果数据保存到mysql表中
object DataSaveMysql {
def main(args: Array[String]): Unit = {
//1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("DataSaveMysql")
//2、创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//3、读取mysql表的数据
//3.1 指定mysql连接地址
val url="jdbc:mysql://node1:3306/spark"
//3.2 指定要加载的表名
val tableName=args(0)
// 3.3 配置连接数据库的相关属性
val properties = new Properties()
//用户名
properties.setProperty("user","root")
//密码
properties.setProperty("password","123456")
val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
//打印schema信息
//mysqlDF.printSchema()
//展示数据
//mysqlDF.show()
//把dataFrame注册成表
mysqlDF.createTempView("iplocation")
//把数据接受到之后,进行进行对应的分析
val result: DataFrame = spark.sql("select * from iplocation where total_count >1500")
//保存result结果数据到mysql表中
val destTable=args(1)
//数据写入到mysql表中可以调用mode方法,这里可以指定数据写入的模式
//overwrite: 表示覆盖,如果表事先不存在,它先帮忙我们创建
//append: 表示追加,如果表事先不存在,它先帮忙我们创建
//ignore: 表示忽略 如果表事先存在,就不进行任何操作
//error :表示报错, 如果表事先存在就报错(默认选项)
result.write.mode("append").jdbc(url,destTable,properties)
spark.stop()
}
}
- 任务的提交脚本
spark-submit
--master spark://node1:7077
--class cn.doit.sparksql.DataSaveMysql
--executor-memory 1g
--total-executor-cores 4
--driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar
--jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar
spark_doit_class08-1.0-shaded.jar
iplocation doit
--driver-class-path:它把对应的jar包下发到Driver端
--jars:它把对应的jar包下发到每一个executor进程
3、sparksql保存数据处理结果的操作
- 代码开发
package cn.doit.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:sparksql可以把结果数据保存到不同的外部存储介质中
object SaveResult {
def main(args: Array[String]): Unit = {
//1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("SaveResult").setMaster("local[2]")
//2、创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//3、加载数据源
val jsonDF: DataFrame = spark.read.json("E:\data\score.json")
//4、把DataFrame注册成表
jsonDF.createTempView("t_score")
//todo:5、统计分析
val result: DataFrame = spark.sql("select * from t_score where score > 80")
//保存结果数据到不同的外部存储介质中
//todo: 5.1 保存结果数据到文本文件 ---- 保存数据成文本文件目前只支持单个字段,不支持多个字段
//result.select("name").write.text("./data/result/123.txt")
//todo: 5.2 保存结果数据到json文件
//result.write.json("./data/json")
//todo: 5.3 保存结果数据到parquet文件
//result.write.parquet("./data/parquet")
//todo: 5.4 save方法保存结果数据,默认的数据格式就是parquet
//result.write.save("./data/save")
//todo: 5.5 保存结果数据到csv文件
//result.write.csv("./data/csv")
//todo: 5.6 保存结果数据到表中
//result.write.saveAsTable("t1")
//todo: 5.7 按照单个字段进行分区 分目录进行存储
//result.write.partitionBy("classNum").json("./data/partitions")
//todo: 5.8 按照多个字段进行分区 分目录进行存储
result.write.partitionBy("classNum","name").json("./data/numPartitions")
spark.stop()
}
}
4、spark的窗口函数
- 代码开发
package cn.doit.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkTopN {
def main(args: Array[String]): Unit = {
//1、构建SparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("SparkTopN").master("local[2]").getOrCreate()
sparkSession.sparkContext.setLogLevel("warn")
//2、加载json数据源
val jsonDF: DataFrame = sparkSession.read.json("E:\data\score.json")
//3、注册成表
jsonDF.createTempView("user_score")
//4、统计分析
//todo:4.1 取出每一个班级中学生成绩最高的
sparkSession.sql("select * from (select *,row_number() over(partition by classNum order by score desc) as rn from user_score) t where t.rn=1" ).show()
//todo:4.2 取出每一个班级中学生成绩最高的前2位
sparkSession.sql("select * from (select *,row_number() over(partition by classNum order by score desc) as rn from user_score) t where t.rn <=2" ).show()
//todo: 4.3 rank 可以跳跃进行排序,如果分数相同,这里的编号是一样,并且下面直接跳过
sparkSession.sql("select *,rank() over(partition by classNum order by score desc) as rn from user_score" ).show()
//todo: 4.4 dense_rank 不跳跃进行排序,如果分数相同,这里的编号是一样,依次连续
sparkSession.sql("select *,dense_rank() over(partition by classNum order by score desc) as rn from user_score" ).show()
sparkSession.stop()
}
}
5、sparksql中自定义函数
5.1 自定义UDF函数(一对一的关系)
- 代码开发
package cn.doit.sparksql
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
//TODO:自定义sparksql的UDF函数 一对一的关系
object SparkSQLFunction {
def main(args: Array[String]): Unit = {
//1、创建SparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLFunction").master("local[2]").getOrCreate()
//2、构建数据源生成DataFrame
val dataFrame: DataFrame = sparkSession.read.text("E:\data\test_udf_data.txt")
//3、注册成表
dataFrame.createTempView("t_udf")
//4、实现自定义的UDF函数
//小写转大写
sparkSession.udf.register("low2Up",new UDF1[String,String]() {
override def call(t1: String): String = {
t1.toUpperCase
}
},StringType)
//大写转小写
sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)
//4、把数据文件中的单词统一转换成大小写
sparkSession.sql("select value from t_udf").show()
sparkSession.sql("select low2Up(value) from t_udf").show()
sparkSession.sql("select up2low(value) from t_udf").show()
sparkSession.stop()
}
}
- 接收2个Decimal参数,进行相加
spark.udf.register("addRR2", (x:Double,y:Double) => {
x + y
})
// UNIX_TIMESTAMP 将GMT 时间转换成时间戳
val result: DataFrame = spark.sql("select addRR2(nni_50,pnni_50) as nni, UNIX_TIMESTAMP(create_time) as create_time, UNIX_TIMESTAMP(update_time) as update_time, status,mean_nni from hrv where mean_nni > 800")
result.show()
5.2 自定义UDAF聚合函数(多对一的关系)
- 代码开发
package cn.doit.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:自定义UDAF函数----->多行输入一个输出
object SparkSQLUDAFFunction {
def main(args: Array[String]): Unit = {
//1、创建SparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLUDAFFunction").master("local[2]").getOrCreate()
//2、读取数据文件
val dataFrame: DataFrame = sparkSession.read.json("E:\data\test_udaf_data.json")
//3、注册成表
dataFrame.createTempView("t_udaf")
//自定义一个udaf函数
sparkSession.udf.register("avgSal",new MyUdafFunction)
//4、通过sparksql执行sql语句,求出用户的平均薪水
sparkSession.sql("select avgSal(salary) from t_udaf").show()
sparkSession.stop()
}
}
- 自定义UDAF类
package cn.doit.sparksql
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class MyUdafFunction extends UserDefinedAggregateFunction{
//输入数据的类型
override def inputSchema: StructType ={
StructType(StructField("input",LongType)::Nil)
}
//缓存区的数据类型 ,指定后期使用类似于2个字段进行对应的除法运行的字段类型 sum countNum
override def bufferSchema: StructType ={
StructType(StructField("sum",LongType)::StructField("countNum",LongType)::Nil)
}
//最后的结果数据类型
override def dataType: DataType ={
DoubleType
}
//表示相同的输入是否得到相同的结果数
override def deterministic: Boolean ={
true
}
//给定数据的初始化值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//给定金额字段的初始值 sum
buffer(0)=0L
//给定表的条数的初始值 countNum
buffer(1)=0L
}
//更新数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
//sum字段的结果数据
buffer(0)=buffer.getLong(0) + input.getLong(0)
//countNum字段的结果数据
buffer(1)=buffer.getLong(1) + 1
}
//由于程序要进行分布式计算,需要把每个分区处理的结果数据最后进行合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit ={
//统计所有用户的总金额
buffer1(0)=buffer1.getLong(0) +buffer2.getLong(0)
//统计一共有多少个用户
buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
}
//统计求平均薪水
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
6、sparksql整合hive
-
1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中
-
2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中
-
3、可以使用spark-sql脚本 后期执行sql相关的任务
spark-sql --master spark://node1:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse
#!/bin/sh #定义sparksql提交脚本的头信息 SUBMITINFO="spark-sql --master spark://node1:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse" #定义一个sql语句 SQL="select * from employee;" #执行sql语句 类似于 hive -e sql语句 echo "$SUBMITINFO" echo "$SQL" $SUBMITINFO -e "$SQL"
set fileformat 查看文件格式 dos/unix set fileformat=unix 修改文件格式
7、使用sparksql实现ip地址查询
- 代码开发
package cn.doit.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:通过开发sparksql程序来实现ip地址查询
object SparkSqlIplocation {
def main(args: Array[String]): Unit = {
//1、构建SparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("SparkSqlIplocation").master("local[2]").getOrCreate()
//2、读取文件数据
val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("warn")
//城市ip信息
val city_ip_rdd: RDD[(Long, Long, String, String)] = sc.textFile("E:\data\ip.txt").map(x=>x.split("\|")).map(x=>(x(2).toLong,x(3).toLong,x(x.length-2),x(x.length-1)))
//运营商日志数据
val ipsRDD: RDD[String] = sc.textFile("E:\data\20090121000132.394251.http.format").map(x=>x.split("\|")(1))
//把ipsRDD中的每一个ip转换成Long类型
val ipLongRDD: RDD[Long] = ipsRDD.map(ip => {
//ip转换Long类型
val split: Array[String] = ip.split("\.")
var ipNum = 0L
for (i <- split) {
ipNum = i.toLong | ipNum << 8L
}
ipNum
})
//3、分别把对应的rdd数据转换成dataFrame
import sparkSession.implicits._
//在toDF方法中可以手动指定字段的名称
val cityIpDF: DataFrame = city_ip_rdd.toDF("ipStart","ipEnd","longitude","latitude")
val ipsDF: DataFrame = ipLongRDD.toDF("ip")
//4、注册成表
cityIpDF.createTempView("t_city_ip")
ipsDF.createTempView("t_user_ip")
//5、统计每一个经纬度出现的总次数
val result: DataFrame = sparkSession.sql("select t1.longitude,t1.latitude,count(*) as num from t_city_ip t1 join t_user_ip t2 on t2.ip between t1.ipStart and t1.ipEnd group by t1.longitude,t1.latitude")
result.show()
sparkSession.stop()
}
}