zoukankan      html  css  js  c++  java
  • sparkSQL-2

    SparkSQL-2

    1.前言

    sparksql-1

    2、JDBC数据源

    sparksql可以从mysql表中加载大量的数据,然后进行相应的统计分析查询,也可以把最后得到的结果数据写回到mysql表
    
    2.1 通过sparksql加载mysql表中的数据
    • 代码开发
    package cn.doit.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql加载mysql表中的数据
    object DataFromMysql {
    
      def main(args: Array[String]): Unit = {
          //1、创建SparkConf对象
            val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")
    
         //2、创建SparkSession对象
          val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
    
        //3、读取mysql表的数据
              //3.1 指定mysql连接地址
                val url="jdbc:mysql://node1:3306/spark?useSSL=false&characterEncoding=utf-8&serverTimezone=GMT%2B8"
              //3.2 指定要加载的表名
                val tableName="iplocation"
             // 3.3 配置连接数据库的相关属性
                 val properties = new Properties()
                 //用户名
                  properties.setProperty("user","root")
                 //密码
                  properties.setProperty("password","123456")
    
          val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
        
          //打印schema信息
          mysqlDF.printSchema()
    
           //展示数据
          mysqlDF.show()
        
           //把dataFrame注册成表
        mysqlDF.createTempView("iplocation")
    
        spark.sql("select * from iplocation where total_count >1500").show()
    
        spark.stop()
      }
    }
    
    
    2.2 通过sparksql把分析到的结果数据保存到mysql表中
    • 代码开发
    package cn.doit.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql进行分析统计之后的结果数据保存到mysql表中
    object DataSaveMysql {
    
      def main(args: Array[String]): Unit = {
          //1、创建SparkConf对象
            val sparkConf: SparkConf = new SparkConf().setAppName("DataSaveMysql").setMaster("local[2]")
    
         //2、创建SparkSession对象
          val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
    
        //3、读取mysql表的数据
              //3.1 指定mysql连接地址
                val url="jdbc:mysql://node1:3306/spark"
              //3.2 指定要加载的表名
                val tableName="iplocation"
             // 3.3 配置连接数据库的相关属性
                 val properties = new Properties()
                 //用户名
                  properties.setProperty("user","root")
                 //密码
                  properties.setProperty("password","123456")
    
          val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
    
          //打印schema信息
          //mysqlDF.printSchema()
    
           //展示数据
          //mysqlDF.show()
    
           //把dataFrame注册成表
        mysqlDF.createTempView("iplocation")
    
        //把数据接受到之后,进行进行对应的分析
        val result: DataFrame = spark.sql("select * from iplocation where total_count >1500")
    
        //保存result结果数据到mysql表中
         val destTable="result2"
         //数据写入到mysql表中可以调用mode方法,这里可以指定数据写入的模式
          //overwrite: 表示覆盖,如果表事先不存在,它先帮忙我们创建
          //append: 表示追加,如果表事先不存在,它先帮忙我们创建
          //ignore: 表示忽略 如果表事先存在,就不进行任何操作
          //error :表示报错, 如果表事先存在就报错(默认选项)
    
        result.write.mode("ignore").jdbc(url,destTable,properties)
    
        spark.stop()
    
    
      }
    }
    
    
    • 后期可以把程序改造一下,打成jar包提交到集群中运行
      package cn.doit.sparksql
      
    
    import java.util.Properties
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql进行分析统计之后的结果数据保存到mysql表中
    object DataSaveMysql {
    
      def main(args: Array[String]): Unit = {
          //1、创建SparkConf对象
            val sparkConf: SparkConf = new SparkConf().setAppName("DataSaveMysql")
    
         //2、创建SparkSession对象
          val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
    
        //3、读取mysql表的数据
              //3.1 指定mysql连接地址
                val url="jdbc:mysql://node1:3306/spark"
              //3.2 指定要加载的表名
                val tableName=args(0)
             // 3.3 配置连接数据库的相关属性
                 val properties = new Properties()
                 //用户名
                  properties.setProperty("user","root")
                 //密码
                  properties.setProperty("password","123456")
        
          val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)
        
          //打印schema信息
          //mysqlDF.printSchema()
        
           //展示数据
          //mysqlDF.show()
        
           //把dataFrame注册成表
        mysqlDF.createTempView("iplocation")
        
        //把数据接受到之后,进行进行对应的分析
        val result: DataFrame = spark.sql("select * from iplocation where total_count >1500")
        
        //保存result结果数据到mysql表中
         val destTable=args(1)
         //数据写入到mysql表中可以调用mode方法,这里可以指定数据写入的模式
          //overwrite: 表示覆盖,如果表事先不存在,它先帮忙我们创建
          //append: 表示追加,如果表事先不存在,它先帮忙我们创建
          //ignore: 表示忽略 如果表事先存在,就不进行任何操作
          //error :表示报错, 如果表事先存在就报错(默认选项)
        
        result.write.mode("append").jdbc(url,destTable,properties)
        
        spark.stop()
    
    
      }
    }
    
    
    • 任务的提交脚本
    spark-submit 
    --master spark://node1:7077 
    --class cn.doit.sparksql.DataSaveMysql 
    --executor-memory 1g 
    --total-executor-cores 4 
    --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar 
    --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar 
    spark_doit_class08-1.0-shaded.jar 
    iplocation doit
    
    --driver-class-path:它把对应的jar包下发到Driver端
    --jars:它把对应的jar包下发到每一个executor进程
    

    3、sparksql保存数据处理结果的操作

    • 代码开发
    package cn.doit.sparksql
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:sparksql可以把结果数据保存到不同的外部存储介质中
    object SaveResult {
    
      def main(args: Array[String]): Unit = {
        //1、创建SparkConf对象
        val sparkConf: SparkConf = new SparkConf().setAppName("SaveResult").setMaster("local[2]")
    
        //2、创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
        //3、加载数据源
          val jsonDF: DataFrame = spark.read.json("E:\data\score.json")
    
        //4、把DataFrame注册成表
          jsonDF.createTempView("t_score")
    
        //todo:5、统计分析
          val result: DataFrame = spark.sql("select * from t_score where score > 80")
    
            //保存结果数据到不同的外部存储介质中
            //todo: 5.1 保存结果数据到文本文件  ----  保存数据成文本文件目前只支持单个字段,不支持多个字段
            //result.select("name").write.text("./data/result/123.txt")
    
            //todo: 5.2 保存结果数据到json文件
            //result.write.json("./data/json")
    
            //todo: 5.3 保存结果数据到parquet文件
            //result.write.parquet("./data/parquet")
    
            //todo: 5.4 save方法保存结果数据,默认的数据格式就是parquet
            //result.write.save("./data/save")
    
            //todo: 5.5 保存结果数据到csv文件
            //result.write.csv("./data/csv")
    
            //todo: 5.6 保存结果数据到表中
            //result.write.saveAsTable("t1")
    
            //todo: 5.7  按照单个字段进行分区 分目录进行存储
            //result.write.partitionBy("classNum").json("./data/partitions")
    
            //todo: 5.8  按照多个字段进行分区 分目录进行存储
           result.write.partitionBy("classNum","name").json("./data/numPartitions")
    
    
        spark.stop()
      }
    
    }
    
    

    4、spark的窗口函数

    • 代码开发
    package cn.doit.sparksql
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object SparkTopN {
    
      def main(args: Array[String]): Unit = {
        //1、构建SparkSession
          val sparkSession: SparkSession = SparkSession.builder().appName("SparkTopN").master("local[2]").getOrCreate()
        sparkSession.sparkContext.setLogLevel("warn")
        //2、加载json数据源
          val jsonDF: DataFrame = sparkSession.read.json("E:\data\score.json")
    
        //3、注册成表
          jsonDF.createTempView("user_score")
    
        //4、统计分析
        //todo:4.1 取出每一个班级中学生成绩最高的
         sparkSession.sql("select * from (select *,row_number() over(partition by classNum order by score desc) as rn  from user_score) t where t.rn=1" ).show()
    
        //todo:4.2 取出每一个班级中学生成绩最高的前2位
        sparkSession.sql("select * from (select *,row_number() over(partition by classNum order by score desc) as rn  from user_score) t where t.rn <=2" ).show()
    
        //todo: 4.3 rank 可以跳跃进行排序,如果分数相同,这里的编号是一样,并且下面直接跳过
        sparkSession.sql("select *,rank() over(partition by classNum order by score desc) as rn  from user_score" ).show()
    
        //todo: 4.4 dense_rank 不跳跃进行排序,如果分数相同,这里的编号是一样,依次连续
        sparkSession.sql("select *,dense_rank() over(partition by classNum order by score desc) as rn  from user_score" ).show()
    
    
        sparkSession.stop()
    
      }
    }
    
    

    5、sparksql中自定义函数

    5.1 自定义UDF函数(一对一的关系)
    • 代码开发
    package cn.doit.sparksql
    
    import org.apache.spark.sql.api.java.UDF1
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //TODO:自定义sparksql的UDF函数    一对一的关系
    object SparkSQLFunction {
    
      def main(args: Array[String]): Unit = {
        //1、创建SparkSession
          val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLFunction").master("local[2]").getOrCreate()
    
        //2、构建数据源生成DataFrame
           val dataFrame: DataFrame = sparkSession.read.text("E:\data\test_udf_data.txt")
    
        //3、注册成表
           dataFrame.createTempView("t_udf")
    
    
        //4、实现自定义的UDF函数
    
        //小写转大写
         sparkSession.udf.register("low2Up",new UDF1[String,String]() {
           override def call(t1: String): String = {
               t1.toUpperCase
           }
         },StringType)
    
        //大写转小写
        sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)
    
    
        //4、把数据文件中的单词统一转换成大小写
          sparkSession.sql("select  value from t_udf").show()
          sparkSession.sql("select  low2Up(value) from t_udf").show()
          sparkSession.sql("select  up2low(value) from t_udf").show()
    
    
        sparkSession.stop()
    
      }
    }
    
    
    • 接收2个Decimal参数,进行相加
    spark.udf.register("addRR2", (x:Double,y:Double) => {
          x + y
        })
    // UNIX_TIMESTAMP 将GMT 时间转换成时间戳
    val result: DataFrame = spark.sql("select addRR2(nni_50,pnni_50) as nni, UNIX_TIMESTAMP(create_time) as create_time, UNIX_TIMESTAMP(update_time) as update_time, status,mean_nni from hrv where mean_nni > 800")
    result.show()
    
    5.2 自定义UDAF聚合函数(多对一的关系)
    • 代码开发
    package cn.doit.sparksql
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:自定义UDAF函数----->多行输入一个输出
    object SparkSQLUDAFFunction {
    
      def main(args: Array[String]): Unit = {
        //1、创建SparkSession
            val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLUDAFFunction").master("local[2]").getOrCreate()
    
       //2、读取数据文件
            val dataFrame: DataFrame = sparkSession.read.json("E:\data\test_udaf_data.json")
    
       //3、注册成表
             dataFrame.createTempView("t_udaf")
    
        //自定义一个udaf函数
          sparkSession.udf.register("avgSal",new MyUdafFunction)
    
       //4、通过sparksql执行sql语句,求出用户的平均薪水
         sparkSession.sql("select avgSal(salary) from t_udaf").show()
    
    
        sparkSession.stop()
      }
    }
    
    
    • 自定义UDAF类
    package cn.doit.sparksql
    
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._
    
    class MyUdafFunction  extends UserDefinedAggregateFunction{
       //输入数据的类型
      override def inputSchema: StructType ={
        StructType(StructField("input",LongType)::Nil)
      }
    
    
      //缓存区的数据类型 ,指定后期使用类似于2个字段进行对应的除法运行的字段类型   sum     countNum
      override def bufferSchema: StructType ={
        StructType(StructField("sum",LongType)::StructField("countNum",LongType)::Nil)
      }
    
      //最后的结果数据类型
      override def dataType: DataType ={
         DoubleType
      }
    
       //表示相同的输入是否得到相同的结果数
      override def deterministic: Boolean ={
          true
      }
    
      //给定数据的初始化值
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
            //给定金额字段的初始值  sum
            buffer(0)=0L
    
            //给定表的条数的初始值  countNum
            buffer(1)=0L
      }
    
      //更新数据
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
        //sum字段的结果数据
        buffer(0)=buffer.getLong(0) +  input.getLong(0)
    
        //countNum字段的结果数据
        buffer(1)=buffer.getLong(1) + 1
    
      }
    
       //由于程序要进行分布式计算,需要把每个分区处理的结果数据最后进行合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit ={
          //统计所有用户的总金额
         buffer1(0)=buffer1.getLong(0) +buffer2.getLong(0)
          //统计一共有多少个用户
         buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
      }
    
      //统计求平均薪水
      override def evaluate(buffer: Row): Any = {
        buffer.getLong(0).toDouble  / buffer.getLong(1)
    
      }
    }
    
    

    6、sparksql整合hive

    • 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中

    • 2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中

    • 3、可以使用spark-sql脚本 后期执行sql相关的任务

      spark-sql 
      --master spark://node1:7077 
      --executor-memory 1g 
      --total-executor-cores 4 
      --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse 
      
      
      #!/bin/sh
      #定义sparksql提交脚本的头信息
      SUBMITINFO="spark-sql --master spark://node1:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse" 
      #定义一个sql语句
      SQL="select * from employee;" 
      #执行sql语句   类似于 hive -e sql语句
      echo "$SUBMITINFO" 
      echo "$SQL"
      $SUBMITINFO -e "$SQL"
      
      set fileformat  查看文件格式 dos/unix
      set fileformat=unix  修改文件格式
      

    7、使用sparksql实现ip地址查询

    • 代码开发
    package cn.doit.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:通过开发sparksql程序来实现ip地址查询
    object SparkSqlIplocation {
    
      def main(args: Array[String]): Unit = {
        //1、构建SparkSession
           val sparkSession: SparkSession = SparkSession.builder().appName("SparkSqlIplocation").master("local[2]").getOrCreate()
    
        //2、读取文件数据
           val sc: SparkContext = sparkSession.sparkContext
             sc.setLogLevel("warn")
           //城市ip信息
          val city_ip_rdd: RDD[(Long, Long, String, String)] = sc.textFile("E:\data\ip.txt").map(x=>x.split("\|")).map(x=>(x(2).toLong,x(3).toLong,x(x.length-2),x(x.length-1)))
    
          //运营商日志数据
          val ipsRDD: RDD[String] = sc.textFile("E:\data\20090121000132.394251.http.format").map(x=>x.split("\|")(1))
          //把ipsRDD中的每一个ip转换成Long类型
          val ipLongRDD: RDD[Long] = ipsRDD.map(ip => {
            //ip转换Long类型
            val split: Array[String] = ip.split("\.")
            var ipNum = 0L
            for (i <- split) {
              ipNum = i.toLong | ipNum << 8L
            }
            ipNum
          })
    
    
       //3、分别把对应的rdd数据转换成dataFrame
             import sparkSession.implicits._
            //在toDF方法中可以手动指定字段的名称
            val cityIpDF: DataFrame = city_ip_rdd.toDF("ipStart","ipEnd","longitude","latitude")
            val ipsDF: DataFrame = ipLongRDD.toDF("ip")
    
       //4、注册成表
           cityIpDF.createTempView("t_city_ip")
           ipsDF.createTempView("t_user_ip")
    
      //5、统计每一个经纬度出现的总次数
          val result: DataFrame = sparkSession.sql("select t1.longitude,t1.latitude,count(*) as num  from t_city_ip t1 join  t_user_ip t2 on t2.ip between t1.ipStart and t1.ipEnd group by t1.longitude,t1.latitude")
          result.show()
    
        sparkSession.stop()
    
      }
    }
    
  • 相关阅读:
    GridView“gv_Info”激发了未处理的事件“RowEditing” “RowEditing”
    VS aspx页面在 设计视图 状态时 才可选用 工具 菜单下的 生成本地资源
    愿能与诸位关心的人及时保持互联
    [转]NOD32 與 無法將工作階段狀態要求送至工作階段狀態伺服器 NOD32与asp.net 状态服务
    [转]JavaScript:只能输入数字(IE、FF)
    勿以恶小而为之>致 被烟所包的程序员
    婚姻 一辈子的幸福厮守 请不要多拿彩礼和父母说事
    [文摘20090601]美国和中国老师讲灰姑娘的故事(差距啊~体现得淋漓尽致)
    多语言开发 之 通过基页类及Session 动态响应用户对语言的选择
    javascript的拖放(第1部分)
  • 原文地址:https://www.cnblogs.com/xujunkai/p/15007542.html
Copyright © 2011-2022 走看看