zoukankan      html  css  js  c++  java
  • Spark SQL+day04笔记

     

    SparkStreaming案例

     

    案例1-WordCount

    1609832955005

     

    yum install -y nc

    https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

    package cn.itcast.streaming

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.{SparkConf, SparkContext, streaming}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Author itcast
    * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount
    */
    object WordCount01 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次

       //TODO 1.加载数据
       val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)

       //TODO 2.处理数据
       val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" "))
        .map((_, 1))
        .reduceByKey(_ + _)

       //TODO 3.输出结果
       resultDS.print()

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
    }
    }

     

     

    案例2-状态管理

    1609834587087

     

     

    package cn.itcast.streaming

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Author itcast
    * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount+实现状态管理:
    * 如输入spark hadoop 得到(spark,1),(hadoop,1)
    * 再下一个批次在输入 spark spark,得到(spark,3)
    */
    object WordCount02 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5s划分一个批次

       //The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
       //注意:state存在checkpoint中
       ssc.checkpoint("./ckp")

       //TODO 1.加载数据
       val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)

       //TODO 2.处理数据
       //定义一个函数用来处理状态:把当前数据和历史状态进行累加
       //currentValues:表示该key(如:spark)的当前批次的值,如:[1,1]
       //historyValue:表示该key(如:spark)的历史值,第一次是0,后面就是之前的累加值如1
       val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {
         if (currentValues.size > 0) {
           val currentResult: Int = currentValues.sum + historyValue.getOrElse(0)
           Some(currentResult)
        } else {
           historyValue
        }
      }

       val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" "))
        .map((_, 1))
         //.reduceByKey(_ + _)
         // updateFunc: (Seq[V], Option[S]) => Option[S]
        .updateStateByKey(updateFunc)

       //TODO 3.输出结果
       resultDS.print()

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination() //注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true) //优雅关闭
    }
    }

     

    案例3-状态恢复-扩展

    package cn.itcast.streaming

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Author itcast
    * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount+实现状态管理+状态恢复
    * 如输入spark hadoop 得到(spark,1),(hadoop,1)
    * 再下一个批次在输入 spark spark,得到(spark,3)
    */
    object WordCount03 {
     def creatingFunc():StreamingContext ={
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5s划分一个批次

       //The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
       //注意:state存在checkpoint中
       ssc.checkpoint("./ckp")

       //TODO 1.加载数据
       val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)

       //TODO 2.处理数据
       //定义一个函数用来处理状态:把当前数据和历史状态进行累加
       //currentValues:表示该key(如:spark)的当前批次的值,如:[1,1]
       //historyValue:表示该key(如:spark)的历史值,第一次是0,后面就是之前的累加值如1
       val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {
         if (currentValues.size > 0) {
           val currentResult: Int = currentValues.sum + historyValue.getOrElse(0)
           Some(currentResult)
        } else {
           historyValue
        }
      }

       val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" "))
        .map((_, 1))
         //.reduceByKey(_ + _)
         // updateFunc: (Seq[V], Option[S]) => Option[S]
        .updateStateByKey(updateFunc)

       //TODO 3.输出结果
       resultDS.print()

       ssc
    }
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val ssc: StreamingContext = StreamingContext.getOrCreate("./ckp", creatingFunc _)
       ssc.sparkContext.setLogLevel("WARN")

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination() //注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true) //优雅关闭
    }
    }

     

    案例4-窗口计算

    1609837521571

     

    如实际开发中:

    每隔1min计算最近24小时的热搜排行榜

    每隔10s计算最近10分钟的广告点击量

    每隔1h计算最近7天的热搜

    1609837862069

     

    package cn.itcast.streaming

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Author itcast
    * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount+窗口计算
    * 每隔5s计算最近10s的数据
    */
    object WordCount04 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次

       //TODO 1.加载数据
       val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)

       //TODO 2.处理数据
       val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" "))
        .map((_, 1))
         //.reduceByKey(_ + _)
         //   windowDuration :窗口长度/窗口大小,表示要计算最近多长时间的数据
         //   slideDuration : 滑动间隔,表示每隔多长时间计算一次
         //   注意:windowDuration和slideDuration必须是batchDuration的倍数
         // 每隔5s(滑动间隔)计算最近10s(窗口长度/窗口大小)的数据
         //reduceByKeyAndWindow(聚合函数,windowDuration,slideDuration)
           //.reduceByKeyAndWindow(_+_,Seconds(10),Seconds(5))
        .reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
       //实际开发中需要我们掌握的是如何根据需求设置windowDuration和slideDuration
       //如:
       //每隔10分钟(滑动间隔slideDuration)更新最近24小时(窗口长度windowDuration)的广告点击数量
       // .reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Minutes(60*24),Minutes(10))

       //TODO 3.输出结果
       resultDS.print()

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
    }
    }

    案例5-topN

    1609897142062

     

    package cn.itcast.streaming

    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Author itcast
    * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount+窗口计算
    * 模拟百度热搜排行榜每隔10s计算最近20s的热搜词
    */
    object WordCount05 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次

       //TODO 1.加载数据
       val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)

       //TODO 2.处理数据
       val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" "))
        .map((_, 1))
         //模拟百度热搜排行榜每隔10s计算最近20s的热搜词Top3
         //windowDuration: Duration,
         //slideDuration: Duration
        .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))
         //注意DStream没有提供直接排序的方法,所以需要直接对底层的RDD操作
       //DStream的transform方法表示对DStream底层的RDD进行操作并返回结果
       val sortedResultDS: DStream[(String, Int)] = resultDS.transform(rdd => {
         val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
         val top3: Array[(String, Int)] = sortRDD.take(3)
         println("=======top3=====")
         top3.foreach(println)
         println("=======top3=====")
         sortRDD
      })

       //TODO 3.输出结果
       sortedResultDS.print()

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
    }
    }
    /*
    31省新增本土确诊23例:河北20例 31省新增本土确诊23例:河北20例 31省新增本土确诊23例:河北20例 31省新增本土确诊23例:河北20例
    特朗普签令禁止与8款中国应用交易 特朗普签令禁止与8款中国应用交易
    纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉
    多家航空公司发布进出京退改票方案 多家航空公司发布进出京退改票方案 多家航空公司发布进出京退改票方案
    石家庄中小学幼儿园暂停线下教学
    */

     

    案例6-自定义输出

    1609898205003

     

    package cn.itcast.streaming

    import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}

    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Author itcast
    * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount+窗口计算
    * 模拟百度热搜排行榜每隔10s计算最近20s的热搜词
    * 最后使用自定义输出将结果输出到控制台/HDFS/MySQL
    */
    object WordCount06 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次

       //TODO 1.加载数据
       val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)

       //TODO 2.处理数据
       val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" "))
        .map((_, 1))
         //模拟百度热搜排行榜每隔10s计算最近20s的热搜词Top3
         //windowDuration: Duration,
         //slideDuration: Duration
        .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))
         //注意DStream没有提供直接排序的方法,所以需要直接对底层的RDD操作
       //DStream的transform方法表示对DStream底层的RDD进行操作并返回结果
       val sortedResultDS: DStream[(String, Int)] = resultDS.transform(rdd => {
         val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
         val top3: Array[(String, Int)] = sortRDD.take(3)
         println("=======top3=====")
         top3.foreach(println)
         println("=======top3=====")
         sortRDD
      })

       //TODO 3.输出结果
       sortedResultDS.print()//默认的输出
       //自定义输出
       sortedResultDS.foreachRDD((rdd,time)=>{
         val milliseconds: Long = time.milliseconds
         println("------自定义输出---------")
         println("batchtime:"+milliseconds)
         println("------自定义输出---------")
         //最后使用自定义输出将结果输出到控制台/HDFS/MySQL
         //输出到控制台
         rdd.foreach(println)
         //输出到HDFS
         rdd.coalesce(1).saveAsTextFile("data/output/result-"+milliseconds)
         //输出到MySQL
         /*
    CREATE TABLE `t_hotwords` (
     `time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
     `word` varchar(255) NOT NULL,
     `count` int(11) DEFAULT NULL,
     PRIMARY KEY (`time`,`word`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
          */
         rdd.foreachPartition(iter=>{
           //开启连接
           val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
           val sql:String = "INSERT INTO `t_hotwords` (`time`, `word`, `count`) VALUES (?, ?, ?);"
           val ps: PreparedStatement = conn.prepareStatement(sql)
           iter.foreach(t=>{
             val word: String = t._1
             val count: Int = t._2
             ps.setTimestamp(1,new Timestamp(milliseconds) )
             ps.setString(2,word)
             ps.setInt(3,count)
             ps.addBatch()
          })
           ps.executeBatch()
           //关闭连接
           if (conn != null) conn.close()
           if (ps != null) ps.close()
        })
      })

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
    }
    }
    /*
    31省新增本土确诊23例:河北20例 31省新增本土确诊23例:河北20例 31省新增本土确诊23例:河北20例 31省新增本土确诊23例:河北20例
    特朗普签令禁止与8款中国应用交易 特朗普签令禁止与8款中国应用交易
    纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉 纸张价格上涨直逼猪肉
    多家航空公司发布进出京退改票方案 多家航空公司发布进出京退改票方案 多家航空公司发布进出京退改票方案
    石家庄中小学幼儿园暂停线下教学
    */

     

    SparkStreaming整合Kafka

    前置说明

    SparkStreaming+Kafka流程

    流式数据 ---> (Flume)---->Kafka--->SparkStreaming/StructStreaming/Flink--->Redis/HBase/HDFS

     

     

    Kafka简单回顾

    1609899734842

     

     

    #启动kafka
    /export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties

    #停止kafka
    /export/server/kafka/bin/kafka-server-stop.sh

    #查看topic信息
    /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181

    #创建topic
    /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic test

    #查看某个topic信息
    /export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test

    #删除topic
    /export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test

    #启动生产者--控制台的生产者--一般用于测试
    /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka


    # 启动消费者--控制台的消费者
    /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark_kafka --from-beginning

     

     

    SparkStreaming连接Kafka两种方式

    1609899922563

     

    1609900032215

     

     

    1609900089165

     

     

    两种API

    1609900128703

     

    1609900253261

     

    总结

    在学习和开发中都是直接使用spark-Streaming-kafka-0.10版本API中的Direct模式来连接Kafka!

     

     

    代码演示-1-自动提交偏移量

    http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

    自动提交偏移量到默认主题和Checkpoint中

    package cn.itcast.streaming

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Author itcast
    * Desc 演示使用spark-streaming-kafka-0-10_2.12中的Direct模式连接Kafka消费数据
    */
    object SparkStreaming_Kafka_Demo01 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次
       ssc.checkpoint("./ckp")

       //TODO 1.加载数据-从Kafka
       val kafkaParams = Map[String, Object](
         "bootstrap.servers" -> "node1:9092",//kafka集群地址
         "key.deserializer" -> classOf[StringDeserializer],//key的反序列化规则
         "value.deserializer" -> classOf[StringDeserializer],//value的反序列化规则
         "group.id" -> "sparkdemo",//消费者组名称
         //earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费
         //latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
         //none:表示如果有offset记录从offset记录开始消费,如果没有就报错
         "auto.offset.reset" -> "latest",
         "auto.commit.interval.ms"->"1000",//自动提交的时间间隔
         "enable.auto.commit" -> (true: java.lang.Boolean)//是否自动提交
      )
       val topics = Array("spark_kafka")//要订阅的主题
       //使用工具类从Kafka中消费消息
       val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
         ssc,
         LocationStrategies.PreferConsistent, //位置策略,使用源码中推荐的
         ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略,使用源码中推荐的
      )

       //TODO 2.处理消息
       val infoDS: DStream[String] = kafkaDS.map(record => {
         val topic: String = record.topic()
         val partition: Int = record.partition()
         val offset: Long = record.offset()
         val key: String = record.key()
         val value: String = record.value()
         val info: String = s"""topic:${topic}, partition:${partition}, offset:${offset}, key:${key}, value:${value}"""
         info
      })

       //TODO 3.输出结果
       infoDS.print()

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
    }
    }
    //测试:
    //1.准备kafka
    // /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
    // /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
    // /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka
    //2.启动程序
    //3.发送数据
    //4.观察结果

     

     

    代码演示-2-手动提交

    package cn.itcast.streaming

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Author itcast
    * Desc 演示使用spark-streaming-kafka-0-10_2.12中的Direct模式连接Kafka消费数据+手动提交offset
    */
    object SparkStreaming_Kafka_Demo02 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5s划分一个批次
       ssc.checkpoint("./ckp")

       //TODO 1.加载数据-从Kafka
       val kafkaParams = Map[String, Object](
         "bootstrap.servers" -> "node1:9092", //kafka集群地址
         "key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则
         "value.deserializer" -> classOf[StringDeserializer], //value的反序列化规则
         "group.id" -> "sparkdemo", //消费者组名称
         //earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费
         //latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
         //none:表示如果有offset记录从offset记录开始消费,如果没有就报错
         "auto.offset.reset" -> "latest",
         //"auto.commit.interval.ms"->"1000",//自动提交的时间间隔
         "enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交
      )
       val topics = Array("spark_kafka") //要订阅的主题
       //使用工具类从Kafka中消费消息
       val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
         ssc,
         LocationStrategies.PreferConsistent, //位置策略,使用源码中推荐的
         ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略,使用源码中推荐的
      )

       //TODO 2.处理消息
       //注意提交的时机:应该是消费完一小批就该提交一次offset,而在DStream一小批的体现是RDD
       kafkaDS.foreachRDD(rdd => {
         if(!rdd.isEmpty()){
           //消费
           rdd.foreach(record => {
             val topic: String = record.topic()
             val partition: Int = record.partition()
             val offset: Long = record.offset()
             val key: String = record.key()
             val value: String = record.value()
             val info: String = s"""topic:${topic}, partition:${partition}, offset:${offset}, key:${key}, value:${value}"""
             println("消费到的消息的详细信息为: "+info)
          })
           //获取rdd中offset相关的信息:offsetRanges里面就包含了该批次各个分区的offset信息
           val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
           //提交
           kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
           println("当前批次的数据已消费并手动提交")
        }
      })

       //TODO 3.输出结果

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination() //注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true) //优雅关闭
    }
    }

    //测试:
    //1.准备kafka
    // /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
    // /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
    // /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka
    //2.启动程序
    //3.发送数据
    //4.观察结果

     

    代码演示-3-手动提交到MySQL-扩展

    package cn.itcast.streaming

    import java.sql.{DriverManager, ResultSet}

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    import scala.collection.mutable

    /**
    * Author itcast
    * Desc 演示使用spark-streaming-kafka-0-10_2.12中的Direct模式连接Kafka消费数据+手动提交offset到MySQL
    */
    object SparkStreaming_Kafka_Demo03 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
       val sc: SparkContext = new SparkContext(conf)
       sc.setLogLevel("WARN")
       //the time interval at which streaming data will be divided into batches
       val ssc: StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5s划分一个批次
       ssc.checkpoint("./ckp")

       //TODO 1.加载数据-从Kafka
       val kafkaParams = Map[String, Object](
         "bootstrap.servers" -> "node1:9092", //kafka集群地址
         "key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则
         "value.deserializer" -> classOf[StringDeserializer], //value的反序列化规则
         "group.id" -> "sparkdemo", //消费者组名称
         //earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费
         //latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
         //none:表示如果有offset记录从offset记录开始消费,如果没有就报错
         "auto.offset.reset" -> "latest",
         //"auto.commit.interval.ms"->"1000",//自动提交的时间间隔
         "enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交
      )
       val topics = Array("spark_kafka") //要订阅的主题

       //Map[主题分区, offset]
       val offsetsMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("sparkdemo","spark_kafka")
       val kafkaDS: InputDStream[ConsumerRecord[String, String]] =  if(offsetsMap.size > 0){
         println("MySQL中存储了该消费者组消费该主题的偏移量记录,接下来从记录处开始消费")
         //使用工具类从Kafka中消费消息
         KafkaUtils.createDirectStream[String, String](
           ssc,
           LocationStrategies.PreferConsistent, //位置策略,使用源码中推荐的
           ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetsMap) //消费策略,使用源码中推荐的
        )
      }else{
         println("MySQL中没有存储该消费者组消费该主题的偏移量记录,接下来从latest开始消费")
         //使用工具类从Kafka中消费消息
         KafkaUtils.createDirectStream[String, String](
           ssc,
           LocationStrategies.PreferConsistent, //位置策略,使用源码中推荐的
           ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略,使用源码中推荐的
        )
      }


       //TODO 2.处理消息
       //注意提交的时机:应该是消费完一小批就该提交一次offset,而在DStream一小批的体现是RDD
       kafkaDS.foreachRDD(rdd => {
         if(!rdd.isEmpty()){
           //消费
           rdd.foreach(record => {
             val topic: String = record.topic()
             val partition: Int = record.partition()
             val offset: Long = record.offset()
             val key: String = record.key()
             val value: String = record.value()
             val info: String = s"""topic:${topic}, partition:${partition}, offset:${offset}, key:${key}, value:${value}"""
             println("消费到的消息的详细信息为: "+info)
          })
           //获取rdd中offset相关的信息:offsetRanges里面就包含了该批次各个分区的offset信息
           val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
           //提交
           //kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
           //提交到MySQL
           OffsetUtil.saveOffsetRanges("sparkdemo",offsetRanges)
           println("当前批次的数据已消费并手动提交到MySQL")
        }
      })

       //TODO 3.输出结果

       //TODO 4.启动并等待结束
       ssc.start()
       ssc.awaitTermination() //注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

       //TODO 5.关闭资源
       ssc.stop(stopSparkContext = true, stopGracefully = true) //优雅关闭
    }
     /*
    手动维护offset的工具类
    首先在MySQL创建如下表
      CREATE TABLE `t_offset` (
        `topic` varchar(255) NOT NULL,
        `partition` int(11) NOT NULL,
        `groupid` varchar(255) NOT NULL,
        `offset` bigint(20) DEFAULT NULL,
        PRIMARY KEY (`topic`,`partition`,`groupid`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
     */
     object OffsetUtil {
       //1.将偏移量保存到数据库
       def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
         val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
         //replace into表示之前有就替换,没有就插入
         val ps = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
         for (o <- offsetRange) {
           ps.setString(1, o.topic)
           ps.setInt(2, o.partition)
           ps.setString(3, groupid)
           ps.setLong(4, o.untilOffset)
           ps.executeUpdate()
        }
         ps.close()
         connection.close()
      }

       //2.从数据库读取偏移量Map(主题分区,offset)
       def getOffsetMap(groupid: String, topic: String) = {
         val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
         val ps = connection.prepareStatement("select * from t_offset where groupid=? and topic=?")
         ps.setString(1, groupid)
         ps.setString(2, topic)
         val rs: ResultSet = ps.executeQuery()
         //Map(主题分区,offset)
         val offsetMap: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
         while (rs.next()) {
           offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
        }
         rs.close()
         ps.close()
         connection.close()
         offsetMap
      }
    }

    }

    //测试:
    //1.准备kafka
    // /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
    // /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
    // /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka
    //2.启动程序
    //3.发送数据
    //4.观察结果

     

    SparkSQL概述

    数据分析方式

    1609915471197

     

    缺点: 有一定的学习成本/入门门槛

    优点: 灵活!可以使用底层的API完成很复杂的业务

     

    1609915522457

     

    优点:入门门槛低,只要会英文单词/简单语法规则就可以写

    缺点:只能做一些简单的业务,复杂业务实现起来较困难

     

    SparkSQL发展历史

    1609916124754

     

    1609916176807

     

     

    SparkSQL官方介绍

    1609916264432

     

     

    1609916323245

     

    1609916425852

     

    1609916457776

     

    说明:

    • 结构化数据--支持

      有固定的结构和约束Schema(字段名称/类型)

      1609916555621

       

       

    • 半结构化数据--支持较为严格的半结构化数据

      有不是固定的结构和约束

      [
      {
         "name": "jack",
         "tel": "1388888888",
      },
      {
         "name": "jack",
         "tel": 13888888888,
         "age":18
      },
      {
         "name": "jack",
         "tel": "1388888888",
         "age": "18"
      }
      ]
    • 非结构数据--需要处理之后变为结构化/半结构化才支持

    如视频/图片/音频...

     

     

     

    SparkSQL数据抽象

    SparkCore的数据抽象:RDD

    SparkStreaming的数据抽象:DStream,底层是RDD

    SparkSQL的数据抽象:DataFrame和DataSet,底层是RDD

    1609917431602

     

    DataFrame

    DataFrame = RDD - 泛型 + Schema约束(指定了字段名和类型) + SQL操作 + 优化

    DataFrame 就是在RDD的基础之上做了进一步的封装,支持SQL操作!

    DataFrame 就是一个分布式表!

     

    DataSet

    DataSet = DataFrame + 泛型

    DataSet = RDD + Schema约束(指定了字段名和类型) + SQL操作 + 优化

    DataSet 就是在RDD的基础之上做了进一步的封装,支持SQL操作!

    DataSet 就是一个分布式表!

     

     

    SparkSQL实战

    实战1-加载数据成为分布式表

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL初体验
    */
    object Demo01 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")

       //TODO 1.加载数据
       val df1: DataFrame = spark.read.text("data/input/text")
       val df2: DataFrame = spark.read.json("data/input/json")
       val df3: DataFrame = spark.read.csv("data/input/csv")

       //TODO 2.处理数据

       //TODO 3.输出结果
       df1.printSchema()
       df2.printSchema()
       df3.printSchema()
       df1.show()
       df2.show()
       df3.show()

       //TODO 4.关闭资源
       spark.stop()
    }
    }

     

    案例2-将RDD转为DataFrame

    使用样例类

    package cn.itcast.sql

    import org.apache.spark
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL-RDD2DataFrame
    */
    object Demo02_RDD2DataFrame1 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")

       //TODO 1.加载数据
       val lines: RDD[String] = sc.textFile("data/input/person.txt")

       //TODO 2.处理数据
       val personRDD: RDD[Person] = lines.map(line => {
         val arr: Array[String] = line.split(" ")
         Person(arr(0).toInt, arr(1), arr(2).toInt)
      })

       //RDD-->DF
       import spark.implicits._
       val personDF: DataFrame = personRDD.toDF()

       //TODO 3.输出结果
       personDF.printSchema()
       personDF.show()

       //TODO 4.关闭资源
       spark.stop()
    }
     case class Person(id:Int,name:String,age:Int)
    }

     

    指定类型+列名

    package cn.itcast.sql

    import cn.itcast.sql.Demo02_RDD2DataFrame1.Person
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL-RDD2DataFrame-指定类型和列名
    */
    object Demo02_RDD2DataFrame2 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")

       //TODO 1.加载数据
       val lines: RDD[String] = sc.textFile("data/input/person.txt")

       //TODO 2.处理数据
       val tupleRDD: RDD[(Int, String, Int)] = lines.map(line => {
         val arr: Array[String] = line.split(" ")
        (arr(0).toInt, arr(1), arr(2).toInt)
      })

       //RDD-->DF
       import spark.implicits._
       val personDF: DataFrame = tupleRDD.toDF("id","name","age")

       //TODO 3.输出结果
       personDF.printSchema()
       personDF.show()

       //TODO 4.关闭资源
       spark.stop()
    }

    }

     

    自定义Schema

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL-RDD2DataFrame-自定义Schema
    */
    object Demo02_RDD2DataFrame3 {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")

       //TODO 1.加载数据
       val lines: RDD[String] = sc.textFile("data/input/person.txt")

       //TODO 2.处理数据
       val rowRDD: RDD[Row] = lines.map(line => {
         val arr: Array[String] = line.split(" ")
         Row(arr(0).toInt, arr(1), arr(2).toInt)
      })

       //RDD-->DF
       import spark.implicits._
       /*val schema: StructType = StructType(
             StructField("id", IntegerType, false) ::
             StructField("name", StringType, false) ::
             StructField("age", IntegerType, false) :: Nil)*/
       val schema: StructType = StructType(List(
         StructField("id", IntegerType, false),
         StructField("name", StringType, false),
         StructField("age", IntegerType, false)
      ))

       val personDF: DataFrame = spark.createDataFrame(rowRDD, schema)

       //TODO 3.输出结果
       personDF.printSchema()
       personDF.show()

       //TODO 4.关闭资源
       spark.stop()
    }

    }

     

    案例3-RDD-DF-DS相互转换

    1609921643278

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL-RDD_DF_DS相互转换
    */
    object Demo03_RDD_DF_DS {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")

       //TODO 1.加载数据
       val lines: RDD[String] = sc.textFile("data/input/person.txt")

       //TODO 2.处理数据
       val personRDD: RDD[Person] = lines.map(line => {
         val arr: Array[String] = line.split(" ")
         Person(arr(0).toInt, arr(1), arr(2).toInt)
      })

       //转换1:RDD-->DF
       import spark.implicits._
       val personDF: DataFrame = personRDD.toDF()
       //转换2:RDD-->DS
       val personDS: Dataset[Person] = personRDD.toDS()
       //转换3:DF-->RDD,注意:DF没有泛型,转为RDD时使用的是Row
       val rdd: RDD[Row] = personDF.rdd
       //转换4:DS-->RDD
       val rdd1: RDD[Person] = personDS.rdd
       //转换5:DF-->DS
       val ds: Dataset[Person] = personDF.as[Person]
       //转换6:DS-->DF
       val df: DataFrame = personDS.toDF()



       //TODO 3.输出结果
       personDF.printSchema()
       personDF.show()
       personDS.printSchema()
       personDS.show()
       rdd.foreach(println)
       rdd1.foreach(println)

       //TODO 4.关闭资源
       spark.stop()
    }
     case class Person(id:Int,name:String,age:Int)
    }

     

    案例4-SparkSQL花式查询

    1609922775976

    需求:针对personDF中的数据使用SQL和DSL两种方式进行各种查询

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL-SQL和DSL两种方式实现各种查询
    */
    object Demo04_Query {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")

       //TODO 1.加载数据
       val lines: RDD[String] = sc.textFile("data/input/person.txt")

       //TODO 2.处理数据
       val personRDD: RDD[Person] = lines.map(line => {
         val arr: Array[String] = line.split(" ")
         Person(arr(0).toInt, arr(1), arr(2).toInt)
      })

       //RDD-->DF
       import spark.implicits._
       val personDF: DataFrame = personRDD.toDF()
       personDF.printSchema()
       personDF.show()
       /*
    root
    |-- id: integer (nullable = false)
    |-- name: string (nullable = true)
    |-- age: integer (nullable = false)

    +---+--------+---+
    | id|   name|age|
    +---+--------+---+
    | 1|zhangsan| 20|
    | 2|   lisi| 29|
    | 3| wangwu| 25|
    | 4| zhaoliu| 30|
    | 5| tianqi| 35|
    | 6|   kobe| 40|
    +---+--------+---+
        */

       //TODO ===========SQL==============
       //注册表名
       //personDF.registerTempTable("")//过期的
       //personDF.createOrReplaceGlobalTempView("")//创建全局的,夸SparkSession也可以用,但是生命周期太长!
       personDF.createOrReplaceTempView("t_person")//创建临时的,当前SparkSession也可以用

       //=1.查看name字段的数据
       spark.sql("select name from t_person").show()
       //=2.查看 name 和age字段数据
       spark.sql("select name,age from t_person").show()
       //=3.查询所有的name和age,并将age+1
       spark.sql("select name,age,age+1 from t_person").show()
       //=4.过滤age大于等于25的
       spark.sql("select name,age from t_person where age >= 25").show()
       //=5.统计年龄大于30的人数
       spark.sql("select count(*) from t_person where age > 30").show()
       //=6.按年龄进行分组并统计相同年龄的人数
       spark.sql("select age,count(*) from t_person group by age").show()
       //=7.查询姓名=张三的
       spark.sql("select name from t_person where name = 'zhangsan'").show()

       //TODO ===========DSL:面向对象的SQL==============
       //=1.查看name字段的数据
       //personDF.select(personDF.col("name"))
       personDF.select("name").show()
       //=2.查看 name 和age字段数据
       personDF.select("name","age").show()
       //=3.查询所有的name和age,并将age+1
       //personDF.select("name","age","age+1").show()//错误的:cannot resolve '`age+1`' given input columns: [age, id, name];;
       //注意$是把字符串转为了Column列对象
       personDF.select($"name",$"age",$"age" + 1).show()
       //注意'是把列名转为了Column列对象
       personDF.select('name,'age,'age + 1).show()
       //=4.过滤age大于等于25的
       personDF.filter("age >= 25").show()
       personDF.filter($"age" >= 25).show()
       personDF.filter('age >= 25).show()
       //=5.统计年龄大于30的人数
       val count: Long = personDF.where('age > 30).count() //where底层filter
       println("年龄大于30的人数为:"+count)
       //=6.按年龄进行分组并统计相同年龄的人数
       personDF.groupBy('age).count().show()
       //=7.查询姓名=张三的
       personDF.filter("name = 'zhangsan'").show()
       personDF.filter($"name"==="zhangsan").show()
       personDF.filter('name ==="zhangsan").show()
       personDF.filter('name =!="zhangsan").show()

       //TODO 3.输出结果
       //TODO 4.关闭资源
       spark.stop()
    }
     case class Person(id:Int,name:String,age:Int)
    }

    案例5-WordCount

     

    1609925579678

     

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

    /**
    * Author itcast
    * Desc 演示SparkSQL-SQL和DSL两种方式实现WordCount
    */
    object Demo05_WordCount {
     def main(args: Array[String]): Unit = {
       //TODO 0.准备环境
       val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
       val sc: SparkContext = spark.sparkContext
       sc.setLogLevel("WARN")
       import spark.implicits._


       //TODO 1.加载数据
       val df: DataFrame = spark.read.text("data/input/words.txt")
       val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")
       df.printSchema()
       df.show()
       ds.printSchema()
       ds.show()
       /*
    root
    |-- value: string (nullable = true)

    +----------------+
    |           value|
    +----------------+
    |hello me you her|
    |   hello you her|
    |       hello her|
    |           hello|
    +----------------+
        */
       //TODO 2.处理数据
       //df.flatMap(_.split(" "))//注意:df没有泛型,不能直接使用split
       val words: Dataset[String] = ds.flatMap(_.split(" "))
       words.printSchema()
       words.show()
       /*
       root
    |-- value: string (nullable = true)

    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    | you|
    | her|
    |hello|
    | you|
    | her|
    |hello|
    | her|
    |hello|
    +-----+
        */
       //TODO ===SQL===
       words.createOrReplaceTempView("t_words")
       val sql:String =
         """
           |select value,count(*) as counts
           |from t_words
           |group by value
           |order by counts desc
           |""".stripMargin
       spark.sql(sql).show()

       //TODO ===DSL===
       words.groupBy('value)
          .count()
          .orderBy('count.desc)
          .show()

       //TODO 3.输出结果
       //TODO 4.关闭资源
       spark.stop()
    }
    }

     

     

     

     

     

     

     

    Spark SQL

     

     

     

     

    学习目标

    1.了解SparkSQL发展历史

    2.理解SparkSQL数据抽象

    3.完成SparkSQL实战案例

    4.掌握SparkSQL自定义UDF

    5.掌握SparkOnHive

    6.掌握Spark分布式SQL引擎

     

    1.  SparkSQL 概述

    1.1   数据分析方式

    1.1.1  命令式

    在前面的课程中, 可以非常明显看到使用的到是命令式的, 主要是通过一个算子操作, 进行一些转换并得到结果,如:

    sc.textFile("...")

      .flatMap(_.split(" "))

      .map((_, 1))

      .reduceByKey(_ + _)

      .collect()

     

    ●命令式的优点

    操作粒度更细,能够控制数据的每一个处理环节

    操作更明确,步骤更清晰,容易维护

    支持半/非结构化数据的操作

    ●命令式的缺点

    需要一定的代码功底

    写起来比较麻烦

     

    1.1.2  SQL

    对于一些数据科学家/数据库管理员/DBA, 要求他们为了做一个非常简单的查询, 写一大堆代码, 明显是一件非常残忍的事情, 所以 SQL on Hadoop 是一个非常重要的方向.

    SELECT

       name,

       age,

       school

    FROM students

    WHERE age > 10

     

    ●SQL的优点

    表达非常清晰, 比如说这段 SQL 明显就是为了查询三个字段,条件是查询年龄大于 10 岁的

     

    ●SQL的缺点

    试想一下3层嵌套的 SQL维护起来应该挺力不从心的吧

    试想一下如果使用SQL来实现机器学习算法也挺为难的吧

     

    1.1.3  总结

     

    SQL 擅长数据分析和通过简单的语法表示查询,命令式操作适合过程式处理和算法性的处理.

    在 Spark 出现之前,对于结构化数据的查询和处理, 一个工具一向只能支持命令式如MR或者只能使用SQL如Hive,开发者被迫要使用多个工具来适应两种场景,并且多个工具配合起来比较费劲.

    而 Spark 出现了以后,提供了两种数据处理范式:RDD的命令式和SparkSQL的SQL式,是一种革新性的进步!

     

    1.2  SparkSQL前世今生

        

    1.2.1  Shark框架-淘汰了

    首先回顾SQL On Hadoopp框架:Hive(可以说Hive时大数据生态系统中第一个SQL框架),架构如下所示:

     

     

     

     

    可以发现Hive框架底层就是MapReduce,所以在Hive中执行SQL时,往往很慢很慢。

     

     

     

     

    Spark出现以后,将HiveQL语句翻译成基于RDD操作,此时Shark框架诞生了。

     

     

     

     

    Spark SQL的前身是Shark,它发布时Hive可以说是SQL on Hadoop的唯一选择(Hive负责将SQL编译成可扩展的MapReduce作业),鉴于Hive的性能以及与Spark的兼容,Shark由此而生。Shark即Hive on Spark,本质上是通过Hive的HQL进行解析,把HQL翻译成Spark上对应的RDD操作,然后通过Hive的Metadata获取数据库里表的信息,实际为HDFS上的数据和文件,最后有Shark获取并放到Spark上计算。

    但是Shark框架更多是对Hive的改造,替换了Hive的物理执行引擎,使之有一个较快的处理速度。然而不容忽视的是Shark继承了大量的Hive代码,因此给优化和维护带来大量的麻烦。为了更好的发展,Databricks在2014年7月1日Spark Summit上宣布终止对Shark的开发,将重点放到SparkSQL模块上。

    文档:https://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html

     

     

     

     

    SparkSQL模块主要将以前依赖Hive框架代码实现的功能自己实现,称为Catalyst引擎

     

     

     

     

    1.2.2  SparkSQL诞生

    从Spark框架1.0开始发布SparkSQL模块开发,直到1.3版本发布SparkSQL Release版本可以在生产环境使用,再到Spark 2.0版本SparkSQL才算真正稳定,在实际开发中发挥者其巨大的作用,SparkSQL的发展经历如下几个阶段:

     

     

     

     

    ü Spark1.0之前:没有SparkSQL, 有一个开源项目shark底层使用Spark作为Hive的执行引擎

    ü Spark1.0时:Spark自己开发了新的组件:SparkSQL, 使用SchemaRDD对RDD进行封装,用来表示带有约束的RDDD

    ü Spark1.3时:SparkSQL开发了新的数据抽象:DataFrame:底层实现了RDD的大部分功能,并增加了SQL操作,不再强依赖RDD,但不支持泛型

    ü Spark1.6时:SparkSQL开发了新的数据抽象:DataSet:支持泛型

    ü Spark2.0时:统一了DataFrame和DataSet: DataSet[Row] = DataFrame,且增加了新的组件:StructuredStreaming支持SQL处理流数据

     

    注意:

    1.SparkSQL发展历史较为曲折,走了很多弯路,但也都是必须的,因为要兼容其他语言

    2.尽管SparkSQL的API很多(且支持很多编程语言),但是使用起来都很友好,底层也都做了很多的性能优化!

     

    1.3  SparkSQL官方定义

    http://spark.apache.org/sql/

     

     

     

    注意:不同数据格式

    1.非结构化数据--SparkSQL不能直接处理

    图片/视频/音频.....

    2.结构化数据--SparkSQL可以处理

    类似于数据库表.有结构有Schema约束

     

     

     

    3.半结构化数据--SparkSQL可以处理较为规范的半结构化数据

    有结构,但是无固定的Schema

    [

    {

        "name": "jack",

        "tel": "1388888888",

    },

    {

        "name": "jack",

        "tel": "1388888888",

        "age":18

    },

    {

        "name": "jack",

        "tel": "1388888888",

        "age": "18"

    }

    ]

     

    l SparkSQL的特点

     

     

     

     

    2.  SparkSQL数据抽象

    2.1  DataFrame

    2.1.1  引入

    就易用性而言,对比传统的MapReduce API,Spark的RDD API有了数量级的飞跃并不为过。然而,对于没有MapReduce和函数式编程经验的新手来说,RDD API仍然存在着一定的门槛。

    另一方面,数据科学家们所熟悉的R、Pandas等传统数据框架虽然提供了直观的API,却局限于单机处理,无法胜任大数据场景。

    为了解决这一矛盾,Spark SQL 1.3在Spark1.0原有SchemaRDD的基础上提供了与R和Pandas风格类似的DataFrame API。

    新的DataFrame AP不仅可以大幅度降低普通开发者的学习门槛,同时还支持Scala、Java与Python三种语言。更重要的是,由于脱胎自SchemaRDD,DataFrame天然适用于分布式大数据场景。

    注意:

    DataFrame它不是Spark SQL提出来的,而是早在R、Pandas语言中就已经有了的。

     

    2.1.2  DataFrame是什么

     

     

     

     

    在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

    DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

     

    使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

     

     

     

     

    上图中左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而中间的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。了解了这些信息之后,Spark SQL的查询优化器就可以进行针对性的优化。后者由于在编译期有详尽的类型信息,编译期就可以编译出更加有针对性、更加优化的可执行代码。

     

    l Schema 信息

    查看DataFrame中Schema是什么,执行如下命令:

    df.schema

    Schema信息封装在StructType中,包含很多StructField对象

     

     

     

     

     

     

     

     

     

    l Row

    DataFrame中每条数据封装在Row中,Row表示每行数据

    如何构建Row对象:要么是传递value,要么传递Seq,官方实例代码:

    import org.apache.spark.sql._

     

    // Create a Row from values.

    Row(value1, value2, value3, ...)

     

    // Create a Row from a Seq of values.

    Row.fromSeq(Seq(value1, value2, ...))

    如何获取Row中每个字段的值呢????

    方式1:下标获取,从0开始,类似数组下标获取

     

     

     

    方式2:指定下标,知道类型

     

     

     

    方式3:通过As转换类型

     

     

     

     

     

    2.2  Dataset

    2.2.1  引入

    Spark在Spark 1.3版本中引入了Dataframe,DataFrame是组织到命名列中的分布式数据集合,但是有如下几点限制:

    l  编译时类型安全: 

    Dataframe API不支持编译时安全性,这限制了在结构不知道时操纵数据。

    以下示例在编译期间有效。但是,执行此代码时将出现运行时异常。

     

     

     

    l  无法对域对象(丢失域对象)进行操作:

    将域对象转换为DataFrame后,无法从中重新生成它;

    下面的示例中,一旦我们从personRDD创建personDF,将不会恢复Person类的原始RDD(RDD [Person]);

     

     

     

     

    基于上述的两点,从Spark 1.6开始出现Dataset,至Spark 2.0中将DataFrame与Dataset合并,其中DataFrame为Dataset特殊类型,类型为Row。

     

     

     

     

    针对RDD、DataFrame与Dataset三者编程比较来说,Dataset API无论语法错误和分析错误在编译时都能发现。

     

     

     

     

    此外RDD与Dataset相比较而言,由于Dataset数据使用特殊编码,所以在存储数据时更加节省内存。

     

     

     

     

     

    2.2.2  Dataset 是什么

     

     

     

    Spark 框架从最初的数据结构RDD、到SparkSQL中针对结构化数据封装的数据结构DataFrame,最终使用Dataset数据集进行封装,发展流程如下。

     

     

     

     

    Dataset是Spark 1.6推出的最新的数据抽象,可以理解为是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口。

    从Spark 2.0开始,DataFrame与Dataset合并,每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]。DataFrame = DataSet[Row]

    Dataset结合了RDD和DataFrame的优点:

    与RDD相比:Dataset保存了更多的描述信息,概念上等同于关系型数据库中的二维表;

    与DataFrame相比:Dataset保存了类型信息,是强类型的,提供了编译时类型检查,调用Dataset的方法先会生成逻辑计划,然后被Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行;

    所以在实际项目中建议使用Dataset进行数据封装,数据分析性能和数据存储更加好。

     

    2.3  总结:RDD、DataFrame和Dataset

     

     

     

     

          SparkSQL中常见面试题:如何理解Spark中三种数据结构RDD、DataFrame和Dataset关系?

    RDD

    ü RDD(Resilient Distributed Datasets)叫做弹性分布式数据集,是Spark中最基本的数据抽象,源码中是一个抽象类,代表一个不可变、可分区、里面的元素可并行计算的集合。

    ü 编译时类型安全,但是无论是集群间的通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化,还存在较大的GC的性能开销,会频繁的创建和销毁对象。

    DataFrame

    ü 与RDD类似,DataFrame是一个分布式数据容器,不过它更像数据库中的二维表格,除了数据之外,还记录这数据的结构信息(即schema)。

    ü DataFrame也是懒执行的,性能上要比RDD高(主要因为执行计划得到了优化)。

    ü 由于DataFrame每一行的数据结构一样,且存在schema中,Spark通过schema就能读懂数据,因此在通信和IO时只需要序列化和反序列化数据,而结构部分不用。

    ü Spark能够以二进制的形式序列化数据到JVM堆以外(off-heap:非堆)的内存,这些内存直接受操作系统管理,也就不再受JVM的限制和GC的困扰了。但是DataFrame不是类型安全的。

    Dataset

    ü Dataset是Spark1.6中对DataFrame API的一个扩展,是Spark最新的数据抽象,结合了RDD和DataFrame的优点。

    ü DataFrame=Dataset[Row](Row表示表结构信息的类型),DataFrame只知道字段,但是不知道字段类型,而Dataset是强类型的,不仅仅知道字段,而且知道字段类型。

    ü 样例类CaseClass被用来在Dataset中定义数据的结构信息,样例类中的每个属性名称直接对应到Dataset中的字段名称。

    ü Dataset具有类型安全检查,也具有DataFrame的查询优化特性,还支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。

    3.  SparkSQL实战

    3.1  案例1:SparkSQL初体验

    3.1.1  准备工作

    l 添加依赖

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-sql_2.11</artifactId>

        <version>2.4.5</version>

    </dependency>

     

    l SparkSession 应用入口

    Spark 2.0开始,SparkSession取代了原本的SQLContext与HiveContext作为SparkSQL应用程序的入口,可以加载不同数据源的数据,封装到DataFrame/Dataset集合数据结构中,使得编程更加简单,程序运行更加快速高效。注意原本的SQLContext与HiveContext仍然保留,以支持向下兼容。

    http://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession

     

    l SparkSession对象实例通过建造者模式构建,代码如下:

    其中

    ①表示导入SparkSession所在的包,

    ②表示建造者模式构建对象和设置属性,

    ③表示导入SparkSession类中implicits对象object中隐式转换函数。

     

     

     

     

    3.1.2  代码演示

    package cn.itcast.sql


    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL
     */
    object SparkSQLDemo00_hello {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        println(this.getClass.getSimpleName)

        println(this.getClass.getSimpleName.stripSuffix("$"))
        val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        val df1: DataFrame = spark.read.text("data/input/text")
        val df2: DataFrame = spark.read.json("data/input/json")
        val df3: DataFrame = spark.read.csv("data/input/csv")
        val df4: DataFrame = spark.read.parquet("data/input/parquet")

        df1.printSchema()
        df1.show(false)
        df2.printSchema()
        df2.show(false)
        df3.printSchema()
        df3.show(false)
        df4.printSchema()
        df4.show(false)


        df1.coalesce(1).write.mode(SaveMode.Overwrite).text("data/output/text")
        df2.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")
        df3.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")
        df4.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")

        //关闭资源
        sc.stop()

        spark.stop()
      }
    }

     

    3.2  案例2:获取DataFrame/DataSet

    http://spark.apache.org/docs/latest/sql-getting-started.html#interoperating-with-rdds

         实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息

    3.2.1  使用样例类

    当RDD中数据类型CaseClass样例类时,底层可以通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-RDD-->DataFrame/DataSet使用样例类
     */
    object SparkSQLDemo02_CreateDFDS1 {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        //注意:在新版的Spark,使用SparkSession来进行SparkSQL开发!
        //因为SparkSession封装了SqlContextHiveContextSparkContext
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        //2.获取RDD
        val fileRDD: RDD[String] = sc.textFile("data/input/person.txt")

        val personRDD: RDD[Person] = fileRDD.map(line => {
          val arr: Array[String] = line.split(" ")
          Person(arr(0).toInt, arr(1), arr(2).toInt)
        })
        //3.RDD->DataFrame/DataSet
        import spark.implicits._ //隐式转换
        val df: DataFrame = personRDD.toDF()

        val ds: Dataset[Person] = personRDD.toDS()

        //4.输出约束和类型
        df.printSchema()

        df.show()

        ds.printSchema()
        ds.show()

        //5.关闭资源
        sc.stop()

        spark.stop()
      }
      case class Person(id:Int,name:String,age:Int)
    }

     

    此种方式要求RDD数据类型必须为CaseClass,转换的DataFrame中字段名称就是CaseClass中属性名称。

    3.2.2  指定类型+列名

    SparkSQL中提供一个函数:toDF,通过指定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。

     

     

     

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-RDD-->DataFrame/DataSet-指定类型和列名
     */
    object SparkSQLDemo02_CreateDFDS2 {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        //注意:在新版的Spark,使用SparkSession来进行SparkSQL开发!
        //因为SparkSession封装了SqlContextHiveContextSparkContext
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        //2.获取RDD
        val fileRDD: RDD[String] = sc.textFile("data/input/person.txt")

        //tupleRDD: RDD[(Int, String, Int)]--指定类型:(Int, String, Int)
        val tupleRDD: RDD[(Int, String, Int)] = fileRDD.map(line => {

          val arr: Array[String] = line.split(" ")
          (arr(0).toInt, arr(1), arr(2).toInt)
        })

        //3.RDD->DataFrame/DataSet
        import spark.implicits._ //隐式转换
        //指定列名
        val df: DataFrame = tupleRDD.toDF("id","name","age")


        //4.输出约束和类型
        df.printSchema()

        df.show()

        //5.关闭资源
        sc.stop()

        spark.stop()
      }
    }

     

    3.2.3  自定义Schema

    依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:

    1、RDD中数据类型为Row:RDD[Row]

    2、针对Row中数据定义Schema:StructType

    3、使用SparkSession中方法将定义的Schema应用到RDD[Row]上;

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-RDD-->DataFrame/DataSet-自定义Schema
     */
    object SparkSQLDemo02_CreateDFDS3 {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        //注意:在新版的Spark,使用SparkSession来进行SparkSQL开发!
        //因为SparkSession封装了SqlContextHiveContextSparkContext
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        //2.获取RDD
        val fileRDD: RDD[String] = sc.textFile("data/input/person.txt")

        //准备rowRDD:RDD[Row]
        val rowRDD: RDD[Row] = fileRDD.map(line => {

          val arr: Array[String] = line.split(" ")
          Row(arr(0).toInt, arr(1), arr(2).toInt)
        })

        //准备Schema
        /*val schema: StructType = StructType(
              StructField("id", IntegerType, true) ::
              StructField("name", StringType, true) ::
              StructField("age", IntegerType, true) :: Nil)*/
        val schema: StructType = StructType(

          List(
            StructField("id", IntegerType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true)
          )
        )

        //3.RDD->DataFrame/DataSet
        import spark.implicits._ //隐式转换
        val df: DataFrame = spark.createDataFrame(rowRDD, schema)



        //4.输出约束和类型
        df.printSchema()

        df.show()

        //5.关闭资源
        sc.stop()

        spark.stop()
      }
    }

     

    此种方式可以更加体会到DataFrame = RDD[Row] + Schema组成,在实际项目开发中灵活的选择方式将RDD转换为DataFrame。

    3.3  案例3:RDD、DFDS相互转换

    3.3.1  转换API

    实际项目开发中,常常需要对RDD、DataFrame及Dataset之间相互转换,其中要点就是Schema约束结构信息。

    l 图解

     

     

     

     

    l API

     1、RDD转换DataFrame或者Dataset

    转换DataFrame时,定义Schema信息,两种方式

    转换为Dataset时,不仅需要Schema信息,还需要RDD数据类型为CaseClass类型

     2、Dataset或DataFrame转换RDD

    由于Dataset或DataFrame底层就是RDD,所以直接调用rdd函数即可转换

    dataframe.rdd 或者dataset.rdd

    3、DataFrame与Dataset之间转换

    由于DataFrame为Dataset特例,所以Dataset直接调用toDF函数转换为DataFrame

    当将DataFrame转换为Dataset时,使用函数as[Type],指定CaseClass类型即可。

     

    l RDD、DataFrame和DataSet之间的转换如下:

    RDD转换到DataFrame:rdd.toDF(“name”)

    RDD转换到Dataset:rdd.map(x => Emp(x)).toDS

    DataFrame转换到Dataset:df.as[Emp]

    DataFrame转换到RDD:df.rdd

    Dataset转换到DataFrame:ds.toDF

    Dataset转换到RDD:ds.rdd

     

     

    l 注意:

    RDD与DataFrame或者DataSet进行操作,都需要引入隐式转换import spark.implicits._,其中的spark是SparkSession对象的名称!

     

    3.3.2  代码实现

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-RDD/DataFrame/DataSet相互转换
     */
    object SparkSQLDemo03_Transformation {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        //2.获取RDD
        val fileRDD: RDD[String] = sc.textFile("data/input/person.txt")

        val personRDD: RDD[Person] = fileRDD.map(line => {
          val arr: Array[String] = line.split(" ")
          Person(arr(0).toInt, arr(1), arr(2).toInt)
        })

        import spark.implicits._ //隐式转换

        //3.相互转换
        // RDD->DF
        val df: DataFrame = personRDD.toDF()

        // RDD->DS
        val ds: Dataset[Person] = personRDD.toDS()

        // DF->RDD
        val rdd: RDD[Row] = df.rdd //注意:rdd->df的时候泛型丢了,所以df->rdd的时候就不知道原来的泛型了,给了个默认的
        // DF->DS
        val ds2: Dataset[Person] = df.as[Person] //df添加上泛型
        // DS->RDD
        val rdd2: RDD[Person] = ds.rdd
        // DS->DF
        val df2: DataFrame = ds.toDF()


        //4.输出约束和类型
        df.printSchema()

        df.show()

        ds.printSchema()
        ds.show()

        //5.关闭资源
        sc.stop()

        spark.stop()
      }
      case class Person(id:Int,name:String,age:Int)
    }

     

     

    3.4  案例4:SparkSQL花式查询

         在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供了两种方式分析处理数据:

    1、SQL 编程,将DataFrame/Dataset注册为临时视图或表,编写SQL语句,类似HiveQL;

    2、DSL(domain-specific language)编程,调用DataFrame/Dataset API(函数),类似RDD中函数;

    3.4.1  基于SQL分析

    将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:

    1、注册为临时视图

     

     

     

    2、编写SQL,执行分析

     

     

     

     

    其中SQL语句类似Hive中SQL语句,查看Hive官方文档,SQL查询分析语句语法,官方文档文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

     

     

     

     

    3.4.2  基于DSL分析

    调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL语句函数,部分截图如下:

     

     

     

     

    类似SQL语法函数:调用Dataset中API进行数据分析,Dataset中涵盖很多函数,大致分类如下:

    l  1、选择函数select:选取某些列的值

     

     

     

    l  2、过滤函数filter/where:设置过滤条件,类似SQL中WHERE语句

     

     

     

     

    l  3、分组函数groupBy/rollup/cube:对某些字段分组,在进行聚合统计

     

     

     

     

    l  4、聚合函数agg:通常与分组函数连用,使用一些count、max、sum等聚合函数操作

     

     

     

     

    l  5、排序函数sort/orderBy:按照某写列的值进行排序(升序ASC或者降序DESC)

     

     

     

     

    l  6、限制函数limit:获取前几条数据,类似RDD中take函数

     

     

     

     

    l  7、重命名函数withColumnRenamed:将某列的名称重新命名

     

     

     

     

    l  8、删除函数drop:删除某些列

     

     

     

     

    l  9、增加列函数withColumn:当某列存在时替换值,不存在时添加此列

     

     

     

     

    上述函数在实际项目中经常使用,尤其数据分析处理的时候,其中要注意,调用函数时,通常指定某个列名称,传递Column对象,通过隐式转换转换字符串String类型为Column对象

     

     

     

    Dataset/DataFrame中转换函数,类似RDD中Transformation函数,使用差不多:

     

     

     

     

     

    3.4.3  花式查询代码演示

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-花式查询-SQL风格和DSL风格
     */
    object SparkSQLDemo04_FlowerQuery {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        //2.获取RDD
        val fileRDD: RDD[String] = sc.textFile("data/input/person.txt")

        val personRDD: RDD[Person] = fileRDD.map(line => {
          val arr: Array[String] = line.split(" ")
          Person(arr(0).toInt, arr(1), arr(2).toInt)
        })
        //3.RDD->DataFrame
        import spark.implicits._ //隐式转换
        val df: DataFrame = personRDD.toDF()


        //4.输出约束和类型
        df.printSchema()

        df.show()

        //TODO =============花式查询============
        println("===========SQL风格========")

        //-1.注册表
        //df.registerTempTable("t_person")
        //df.createOrReplaceGlobalTempView("t_person")//创建一个全局的视图/,所有SparkSession可用--生命周期太长
        df.createOrReplaceTempView("t_person") //创建一个临时视图/,SparkSession可用
        //-2.各种查询
        //=1.查看name字段的数据
        spark.sql("select name from t_person").show(false)

        //=2.查看 name age字段数据
        spark.sql("select name,age from t_person").show(false)

        //=3.查询所有的nameage,并将age+1
        spark.sql("select name,age,age+1 from t_person").show(false)

        //=4.过滤age大于等于25
        spark.sql("select id,name,age from t_person where age >= 25").show(false)

        //=5.统计年龄大于30的人数
        spark.sql("select count(*) from t_person where age > 30").show(false)

        //=6.按年龄进行分组并统计相同年龄的人数
        spark.sql("select age,count(*) from t_person group by age").show(false)

        //=7.查询姓名=张三的
        val name = "zhangsan"

        spark.sql("select id,name,age from t_person where name='zhangsan'").show(false)
        spark.sql(s"select id,name,age from t_person where name='${name}'").show(false)

        println("===========DSL风格========")
        //=1.查看name字段的数据
        df.select(df.col("name")).show(false)

        import org.apache.spark.sql.functions._
        df.select(col("name")).show(false)
        df.select("name").show(false)

        //=2.查看 name age字段数据
        df.select("name", "age").show(false)


        //=3.查询所有的nameage,并将age+1
        //df.select("name","age","age+1").show(false)//报错:没有"age+1"这个列名
        //df.select("name","age","age"+1).show(false)//报错:没有"age+1"这个列名
        df.select($"name", $"age", $"age" + 1).show(false) //$"age"表示获取该列的值/$"列名"表示将该列名字符串转为列对象
        df.select('name, 'age, 'age + 1).show(false) //'列名表示将该列名字符串转为列对象

        //=4.过滤age大于等于25
        df.filter("age >= 25").show(false)

        df.where("age >= 25").show(false)

        //=5.统计年龄大于30的人数
        val count: Long = df.filter("age > 30").count()

        println("年龄大于30的人数"+count)

        //=6.按年龄进行分组并统计相同年龄的人数
        df.groupBy("age").count().show(false)


        //=7.查询姓名=张三的
        df.filter("name ='zhangsan'").show(false)

        df.where("name ='zhangsan'").show(false)
        df.filter($"name" === "zhangsan").show(false)
        df.filter('name === "zhangsan").show(false)
        //=8.查询姓名!=张三的
        df.filter($"name" =!= name).show(false)

        df.filter('name =!= "zhangsan").show(false)


        //TODO =============花式查询============

        //5.关闭资源
        sc.stop()

        spark.stop()
      }

      case class Person(id: Int, name: String, age: Int)

    }

     

    3.5  案例5:SparkSQL实现WordCount

    3.5.1  需求

    使用SparkSQL的SQL和DSL两种方式完成WordCount

    3.5.2  代码实现

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-完成WordCount
     */
    object SparkSQLDemo05_WordCount {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._

        //2.获取DF/DS
        //获取DF/DS方式一:通过RDD->DF/DS
        val fileRDD: RDD[String] = sc.textFile("data/input/words.txt")

        val df: DataFrame = fileRDD.toDF("value")
        val ds: Dataset[String] = df.as[String]
        df.printSchema()
        df.show(false)
        ds.printSchema()
        ds.show(false)

        //获取DF/DS方式二:
        val df2: DataFrame = spark.read.text("data/input/words.txt")

        df2.printSchema()
        df2.show(false)
        val ds2: Dataset[String] = spark.read.textFile("data/input/words.txt")
        ds2.printSchema()
        ds2.show(false)
        /*
        root
       |-- value: string (nullable = true)

      +----------------+
      |value           |
      +----------------+
      |hello me you her|
      |hello you her   |
      |hello her       |
      |hello           |
      +----------------+
         */

        //3.计算WordCount
        //df.flatMap(_.split(" ")) //报错:DF没有泛型,不知道_String
        //df2.flatMap(_.split(" "))//报错:DF没有泛型,不知道_String
        val wordDS: Dataset[String] = ds.flatMap(_.split(" "))

        //ds2.flatMap(_.split(" "))

        wordDS.printSchema()

        wordDS.show(false)
        /*
        +-----+
        |value|
        +-----+
        |hello|
        |me   |
        |you  |
        ....
         */

        //TODO SQL风格
        wordDS.createOrReplaceTempView("t_words")

        val sql: String =
          """
            |select value as word,count(*) as counts
            |from t_words
            |group by word
            |order by counts desc
            |""".stripMargin
        spark.sql(sql).show(false)

        //TODO DSL风格
        wordDS.groupBy("value")

          .count()
          .orderBy('count.desc)
          .show(false)
        /*
        +-----+------+
        |word |counts|
        +-----+------+
        |hello|4     |
        |her  |3     |
        |you  |2     |
        |me   |1     |
        +-----+------+

        +-----+-----+
        |value|count|
        +-----+-----+
        |hello|4    |
        |her  |3    |
        |you  |2    |
        |me   |1    |
        +-----+-----+
         */


        //4.关闭资源
        sc.stop()

        spark.stop()
      }

      case class Person(id: Int, name: String, age: Int)

    }

    注意:

    1.从上述的案例可以发现将数据封装到Dataset/DataFrame中,进行处理分析,更加方便简洁

    2.无论使用DSL还是SQL编程方式,底层转换为RDD操作都是一样,性能一致,查看WEB UI监控中Job运行对应的DAG图如下:

     

     

     

     

    3.6  案例6:SparkSQL实现电影评分数据分析

    3.6.1  数据

     

     

     

     

    3.6.2  需求

    对电影评分数据进行统计分析,分别使用DSL编程和SQL编程,获取电影平均分Top10,要求电影的评分次数大于200

     

    3.6.3  步骤

    1、读取电影评分数据,从本地文件系统读取

    2、转换数据,指定Schema信息,封装到DataFrame

    3、基于SQL方式分析

    4、基于DSL方式分析

     

    3.6.4  代码实现

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL- 统计评分次数>200的电影的平均分最高的Top10
     */
    object SparkSQLDemo06_MovieTop10 {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._

        //2.获取DF/DS
        //也可以用rdd-->df
        val fileDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")

        val rowDS: Dataset[(Int, Int)] = fileDS.map(line => {
          val arr: Array[String] = line.split(" ")
          (arr(1).toInt, arr(2).toInt)
        })
        val cleanDF: DataFrame = rowDS.toDF("mid","score")
        cleanDF.printSchema()
        cleanDF.show(false)
        /*
        +----+-----+
        |mid |score|
        +----+-----+
        |242 |3    |
        |302 |3    |
        |377 |1    |
        |51  |2    |
        |346 |1    |
          ...
        */

        //3.完成需求:统计评分次数>200的电影的平均分最高的Top10
        //TODO SQL
        cleanDF.createOrReplaceTempView("t_scores")

        val sql:String =
          """
            |select mid, round(avg(score),2) avg,count(*) counts
            |from t_scores
            |group by mid
            |having counts > 200
            |order by avg desc,counts desc
            |limit 10
            |""".stripMargin
        spark.sql(sql).show(false)

        //TODO DSL
        import org.apache.spark.sql.functions._

        cleanDF
          .groupBy("mid")
            .agg(
              round(avg('score),2) as "avg",
              count('mid) as "counts"
            )//聚合函数可以写在这里
            .orderBy('avg.desc,'counts.desc)

            .filter('counts > 200)
            .limit(10)
            .show(false)
        /*
        +---+----+------+
        |mid|avg |counts|
        +---+----+------+
        |318|4.47|298   |
        |483|4.46|243   |
        |64 |4.45|283   |
        |12 |4.39|267   |
        |603|4.39|209   |
        |50 |4.36|583   |
        |98 |4.29|390   |
        |357|4.29|264   |
        |427|4.29|219   |
        |127|4.28|413   |
        +---+----+------+
         */

        //4.关闭资源
        sc.stop()

        spark.stop()
      }
    }

     

    3.6.5  Shuffle分区数

    运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

     

     

     

    原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置

    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      // TODO: 设置shuffle时分区数目
      .config("spark.sql.shuffle.partitions", "4")

      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

     

    3.6.6  扩展阅读:Catalyst 优化器

    在【案例:电影评分数据分析】中,运行应用程序代码,通过WEB UI界面监控可以看出,无论使用DSL还是SQL,构建Job的DAG图一样的,性能是一样的,原因在于SparkSQL中引擎:Catalyst:将SQL和DSL转换为相同逻辑计划。

     

     

     

     

     

     

     

        Spark SQL是Spark技术最复杂的组件之一,Spark SQL的核心是Catalyst优化器,它以一种新颖的方式利用高级编程语言功能(例如Scala的模式匹配和quasiquotes)来构建可扩展的查询优化器。

     

     

     

     

    SparkSQL的Catalyst优化器是整个SparkSQL pipeline的中间核心部分,其执行策略主要两方向:

    l  基于规则优化/Rule Based Optimizer/RBO;

    l  基于代价优化/Cost Based Optimizer/CBO;

     

     

     

     

    从上图可见,无论是直接使用SQL语句还是使用 ataFrame,都会经过一些列步骤转换成DAG对RDD的操作。

    Catalyst工作流程:SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan;Unresolved Logical Plan通过Analyzer模块借助于数据元数据解析为Logical Plan;此时再通过各种基于规则的Optimizer进行深入优化,得到Optimized Logical Plan;优化后的逻辑执行计划依然是逻辑的,需要将逻辑计划转化为Physical Plan。

     

     

     

     

    Catalyst的三个核心点:

    1、Parser,第三方类库ANTLR实现。将sql字符串切分成Token,根据语义规则解析成一颗AST语法树;

    2、Analyzer,Unresolved Logical Plan,进行数据类型绑定和函数绑定;

    3、Optimizer,规则优化就是模式匹配满足特定规则的节点等价转换为另一颗语法树;

     

     

     

     

     

     

    3.7  案例7:External DataSource

    在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:

     

     

     

     

    在Spark 2.4版本中添加支持Image Source(图像数据源)和Avro Source。

     

    3.7.1  数据源与格式

         数据分析处理中,数据可以分为结构化数据、非结构化数据及半结构化数据。

     

     

     

     

    1)、结构化数据(Structured)

    n 结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使从列的子集中提取值变得更加容易。

    n 基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。如因结构的固定性,格式转变可能相对困难。

    2)、非结构化数据(UnStructured)

    n 相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据以定义数据的结构。

    n 报纸文章,医疗记录,图像,应用程序日志通常被视为非结构化数据。这些类型的源通常要求数据周围的上下文是可解析的。

    3)、半结构化数据(Semi-Structured)

    n 半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。每个数据记录都使用其结构信息进行扩充。

    n 半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。但这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc(特定)查询而构建的。

    3.7.1.1  text 数据

    SparkSession加载文本文件数据,提供两种方法,返回值分别为DataFrame和Dataset,前面【WordCount】中已经使用,下面看一下方法声明:

     

     

     

     

    可以看出textFile方法底层还是调用text方法,先加载数据封装到DataFrame中,再使用as[String]方法将DataFrame转换为Dataset,实际中推荐使用textFile方法,从Spark 2.0开始提供。

    无论是text方法还是textFile方法读取文本数据时,一行一行的加载数据,每行数据使用UTF-8编码的字符串,列名称为【value】。

     

     

     

    3.7.1.2  json 数据

    实际项目中,有时处理数据以JSON格式存储的,尤其后续结构化流式模块:StructuredStreaming,从Kafka Topic消费数据很多时间是JSON个数据,封装到DataFrame中,需要解析提取字段的值。以读取github操作日志JSON数据为例,数据结构如下:

     

     

     

     

    1、操作日志数据使用GZ压缩:2015-03-01-11.json.gz,先使用json方法读取。

    2、使用textFile加载数据,对每条JSON格式字符串数据,使用SparkSQL函数库functions中自带get_json_obejct函数提取字段:id、type、public和created_at的值。

    n 函数:get_json_obejct使用说明

     

     

     

     

    示例代码:

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

    /**
     * SparkSQL读取JSON格式文本数据
     */
    object SparkSQLJson {

      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[*]")
          // 通过装饰模式获取实例对象,此种方式为线程安全的
          .getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._

        // TODO: LocalFS上读取json格式数据(压缩)
        val jsonDF: DataFrame = spark.read.json("data/input/2015-03-01-11.json.gz")

        //jsonDF.printSchema()
        jsonDF.show(5, truncate = true)


        println("===================================================")
        val githubDS: Dataset[String] = spark.read.textFile("data/input/2015-03-01-11.json.gz")
        //githubDS.printSchema() // value 字段名称,类型就是String
        githubDS.show(5,truncate = true)


        // TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数
        import org.apache.spark.sql.functions._

        // 获取如下四个字段的值:idtypepubliccreated_at
        val gitDF: DataFrame = githubDS.select(

          get_json_object($"value", "$.id").as("id"),
          get_json_object($"value", "$.type").as("type"),
          get_json_object($"value", "$.public").as("public"),
          get_json_object($"value", "$.created_at").as("created_at")
        )
        gitDF.printSchema()
        gitDF.show(10, truncate = false)

        // 应用结束,关闭资源
        spark.stop()

      }
    }

     

    运行结果:

     

     

     

     

     

    3.7.1.3  csv 数据

    在机器学习中,常常使用的数据存储在csv/tsv文件格式中,所以SparkSQL中也支持直接读取格式数据,从2.0版本开始内置数据源。关于CSV/TSV格式数据说明:

     

     

     

     

    SparkSQL中读取CSV格式数据,可以设置一些选项,重点选项:

    1、分隔符:sep

    默认值为逗号,必须单个字符

    2、数据文件首行是否是列名称:header

    默认值为false,如果数据文件首行是列名称,设置为true

    3、是否自动推断每个列的数据类型:inferSchema

    默认值为false,可以设置为true

    官方提供案例:

     

     

     

     

    当读取CSV/TSV格式数据文件首行是否是列名称,读取数据方式(参数设置)不一样的 。

    l 1:首行是列的名称,如下方式读取数据文件

            // TODO: 读取TSV格式数据

            val ratingsDF: DataFrame = spark.read

                // 设置每行数据各个字段之间的分隔符, 默认值为 逗号

                .option("sep", " ")

                // 设置数据文件首行为列名称,默认值为 false

                .option("header", "true")

                // 自动推荐数据类型,默认值为false

                .option("inferSchema", "true")

                // 指定文件的路径

                .csv("datas/ml-100k/u.dat")

            

            ratingsDF.printSchema()

            ratingsDF.show(10, truncate = false)

    l 2:首行不是列的名称,如下方式读取数据(设置Schema信息)

            // 定义Schema信息

            val schema = StructType(

                StructField("user_id", IntegerType, nullable = true) ::

                    StructField("movie_id", IntegerType, nullable = true) ::

                    StructField("rating", DoubleType, nullable = true) ::

                    StructField("timestamp", StringType, nullable = true) :: Nil

            )

            

            // TODO: 读取TSV格式数据

            val mlRatingsDF: DataFrame = spark.read

                // 设置每行数据各个字段之间的分隔符, 默认值为 逗号

                .option("sep", " ")

                // 指定Schema信息

                .schema(schema)

                // 指定文件的路径

                .csv("datas/ml-100k/u.data")

            

            mlRatingsDF.printSchema()

            mlRatingsDF.show(5, truncate = false)

         将DataFrame数据保存至CSV格式文件,演示代码如下:

            /**

             * 将电影评分数据保存为CSV格式数据

             */

            mlRatingsDF

                // 降低分区数,此处设置为1,将所有数据保存到一个文件中

                .coalesce(1)

                .write

                // 设置保存模式,依据实际业务场景选择,此处为覆写

                .mode(SaveMode.Overwrite)

                .option("sep", ",")

                // TODO: 建议设置首行为列名

                .option("header", "true")

                .csv("datas/ml-csv-" + System.nanoTime())

      

    l 示例代码

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

    /**
     * SparkSQL 读取CSV/TSV格式数据:
     * i). 指定Schema信息
     * ii). 是否有header设置
     */
    object SparkSQLCsv {

        def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder()
              .appName(this.getClass.getSimpleName.stripSuffix("$"))
              .master("local[*]")
              // 通过装饰模式获取实例对象,此种方式为线程安全的
              .getOrCreate()

            val sc: SparkContext = spark.sparkContext
            sc.setLogLevel("WARN")
            import spark.implicits._

            /**
             * 实际企业数据分析中
             * csv sv格式数据,每个文件的第一行(head, 首行),字段的名称(列名)
             */
            // TODO: 读取CSV格式数据
            val ratingsDF: DataFrame = spark.read

                // 设置每行数据各个字段之间的分隔符, 默认值为 逗号
                .option("sep", " ")

                // 设置数据文件首行为列名称,默认值为 false
                .option("header", "true")

                // 自动推荐数据类型,默认值为false
                .option("inferSchema", "true")

                // 指定文件的路径
                .csv("data/input/rating_100k_with_head.data")

            
            ratingsDF.printSchema()
            ratingsDF.show(10, truncate = false)
            
            println("=======================================================")
            // 定义Schema信息
            val schema = StructType(

                StructField("user_id", IntegerType, nullable = true) ::
                    StructField("movie_id", IntegerType, nullable = true) ::
                    StructField("rating", DoubleType, nullable = true) ::
                    StructField("timestamp", StringType, nullable = true) :: Nil
            )

            
            // TODO: 读取CSV格式数据
            val mlRatingsDF: DataFrame = spark.read

                // 设置每行数据各个字段之间的分隔符, 默认值为 逗号
                .option("sep", " ")

                // 指定Schema信息
                .schema(schema)

                // 指定文件的路径
                .csv("data/input/rating_100k.data")

            
            mlRatingsDF.printSchema()
            mlRatingsDF.show(10, truncate = false)
            
            println("=======================================================")
            /**
             * 将电影评分数据保存为CSV格式数据
             */
            mlRatingsDF

                // 降低分区数,此处设置为1,将所有数据保存到一个文件中
                .coalesce(1)

                .write
                // 设置保存模式,依据实际业务场景选择,此处为覆写
                .mode(SaveMode.Overwrite)

                .option("sep", ",")
                // TODO: 建议设置首行为列名
                .option("header", "true")

                .csv("data/output/ml-csv-" + System.currentTimeMillis())
            
            // 关闭资源
            spark.stop()

        }
        
    }

     

    3.7.1.4  parquet 数据

    SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据,通过参数【spark.sql.sources.default】设置,默认值为【parquet】。

    l 示例代码:

    直接load加载parquet数据和指定parquet格式加载数据。

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
      * SparkSQL读取Parquet列式存储数据
      */
    object SparkSQLParquet {

        def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder()
              .appName(this.getClass.getSimpleName.stripSuffix("$"))
              .master("local[*]")
              // 通过装饰模式获取实例对象,此种方式为线程安全的
              .getOrCreate()

            val sc: SparkContext = spark.sparkContext
            sc.setLogLevel("WARN")
            import spark.implicits._

            // TODO: LocalFS上读取parquet格式数据
            val usersDF: DataFrame = spark.read.parquet("data/input/users.parquet")

            usersDF.printSchema()
            usersDF.show(10, truncate = false)

            println("==================================================")

            // SparkSQL默认读取文件格式为parquet
            val df = spark.read.load("data/input/users.parquet")

            df.printSchema()
            df.show(10, truncate = false)

            // 应用结束,关闭资源
            spark.stop()

        }
    }

     

    运行程序结果:

     

     

     

     

     

     

    3.7.1.5  jdbc 数据

    回顾在SparkCore中读取MySQL表的数据通过JdbcRDD来读取的,在SparkSQL模块中提供对应接口,提供三种方式读取数据:

    l 1:单分区模式

     

     

     

     

    l 2:多分区模式,可以设置列的名称,作为分区字段及列的值范围和分区数目

     

     

     

     

    l 3:高度自由分区模式,通过设置条件语句设置分区数据及各个分区数据范围

     

     

     

     

    当加载读取RDBMS表的数据量不大时,可以直接使用单分区模式加载;当数据量很多时,考虑使用多分区及自由分区方式加载。

    从RDBMS表中读取数据,需要设置连接数据库相关信息,基本属性选项如下:

     

     

     

     

    l 演示代码如下:

    // 连接数据库三要素信息

            val url: String = "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"

            val table: String = "db_shop.so"

            // 存储用户和密码等属性

            val props: Properties = new Properties()

            props.put("driver", "com.mysql.cj.jdbc.Driver")

            props.put("user", "root")

            props.put("password", "123456")

            

            // TODO: 从MySQL数据库表:销售订单表 so

            // def jdbc(url: String, table: String, properties: Properties): DataFrame

            val sosDF: DataFrame = spark.read.jdbc(url, table, props)

            println(s"Count = ${sosDF.count()}")

            sosDF.printSchema()

            sosDF.show(10, truncate = false)

    可以使用option方法设置连接数据库信息,而不使用Properties传递,代码如下:

    // TODO: 使用option设置参数

            val dataframe: DataFrame = spark.read

                .format("jdbc")

                .option("driver", "com.mysql.cj.jdbc.Driver")

                .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")

                .option("user", "root")

                .option("password", "123456")

                .option("dbtable", "db_shop.so")

                .load()

            dataframe.show(5, truncate = false)

     

    3.7.2  加载/保存数据-API

        SparkSQL提供一套通用外部数据源接口,方便用户从数据源加载和保存数据,例如从MySQL表中既可以加载读取数据:load/read,又可以保存写入数据:save/write。

     

     

     

     

    由于SparkSQL没有内置支持从HBase表中加载和保存数据,但是只要实现外部数据源接口,也能像上面方式一样读取加载数据。

     

    3.7.2.1  Load 加载数据

    在SparkSQL中读取数据使用SparkSession读取,并且封装到数据结构Dataset/DataFrame中。

     

     

     

     

    DataFrameReader专门用于加载load读取外部数据源的数据,基本格式如下:

     

     

     

     

    SparkSQL模块本身自带支持读取外部数据源的数据:

     

     

     

     

    总结起来三种类型数据,也是实际开发中常用的:

    1:文件格式数据

    文本文件text、csv文件和json文件

    2:列式存储数据

    Parquet格式、ORC格式

    3:数据库表

    关系型数据库RDBMS:MySQL、DB2、Oracle和MSSQL

    Hive仓库表

     

     

     

     

    官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html

    此外加载文件数据时,可以直接使用SQL语句,指定文件存储格式和路径:

     

     

     

     

     

    3.7.2.2  Save 保存数据

    SparkSQL模块中可以从某个外部数据源读取数据,就能向某个外部数据源保存数据,提供相应接口,通过DataFrameWrite类将数据进行保存。

     

     

     

     

    与DataFrameReader类似,提供一套规则,将数据Dataset保存,基本格式如下:

     

     

     

     

    SparkSQL模块内部支持保存数据源如下:

     

     

     

     

    所以使用SpakrSQL分析数据时,从数据读取,到数据分析及数据保存,链式操作,更多就是ETL操作。当将结果数据DataFrame/Dataset保存至Hive表中时,可以设置分区partition和分桶bucket,形式如下:

     

     

     

     

     

     

    3.7.2.3  保存模式(SaveMode

         将Dataset/DataFrame数据保存到外部存储系统中,考虑是否存在,存在的情况下的下如何进行保存,DataFrameWriter中有一个mode方法指定模式:

     

     

     

     

    通过源码发现SaveMode时枚举类,使用Java语言编写,如下四种保存模式:

    1:Append 追加模式,当数据存在时,继续追加;

    2:Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;

    3:ErrorIfExists 存在及报错;

    4:Ignore 忽略,数据存在时不做任何操作;

    实际项目依据具体业务情况选择保存模式,通常选择Append和Overwrite模式。

    3.7.3  案例演示

    package cn.itcast.sql

    import java.util.Properties

    import cn.itcast.sql.SparkSQLDemo02_CreateDFDS.Person

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-外部数据源
     */
    object SparkSQLDemo07_Datasource {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._

        //获取DF/DS
        //方式1:RDD-->DF/DS:兼容之前的RDD的项目
        //方式2:直接读取为DF/DS:优先考虑使用,支持多种数据源/数据格式:json/csv/parquet/jdbc....

        //需求:准备一个DF,写入到不同的数据格式/数据源中,然后再读出来
        //2.准备一个DF
        val fileRDD: RDD[String] = sc.textFile("data/input/person.txt")

        val personRDD: RDD[Person] = fileRDD.map(line => {
          val arr: Array[String] = line.split(" ")
          Person(arr(0).toInt, arr(1), arr(2).toInt)
        })
        import spark.implicits._ //隐式转换
        val df: DataFrame = personRDD.toDF()

        df.printSchema()
        df.show(false)

        //TODO
        //df.coalesce(1).write.mode(SaveMode.Overwrite)
        //.text("data/output/text")//注意:往普通文件写不支持Schema
        df.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")

        df.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")
        df.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")
        val prop = new Properties()
        prop.setProperty("user","root")
        prop.setProperty("password","root")
        df.coalesce(1).write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)//表会自动创建

        //TODO
        //spark.read.text("data/output/text").show(false)
        spark.read.json("data/output/json").show(false)

        spark.read.csv("data/output/csv").toDF("id1","name1","age1").show(false)
        spark.read.parquet("data/output/parquet").show(false)
        spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show(false)

        sc.stop()
        spark.stop()
      }
      case class Person(id:Int,name:String,age:Int)
    }

     

    3.8  扩展:SparkSQL开窗函数

    3.8.1  概述

    https://www.cnblogs.com/qiuting/p/7880500.html

    ●介绍

    开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

    开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

     

    ●聚合函数和开窗函数

    聚合函数是将多行变成一行,count,avg....

    开窗函数是将一行变成多行;

    聚合函数如果要显示其他的列必须将列加入到group by中

    开窗函数可以不使用group by,直接将所有信息显示出来

     

    ●开窗函数分类

    1.聚合开窗函数

    聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。

    2.排序开窗函数

    排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

    3.8.2  聚合开窗函数

    ●示例1

    OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

    SQL标准允许将所有聚合函数用做聚合开窗函数。

    spark.sql("select  count(name)  from scores").show

    spark.sql("select name, class, score, count(name) over() name_count from scores").show

     

    查询结果如下所示:

    +----+-----+-----+----------+                                                   

    |name|class|score|name_count|

    +----+-----+-----+----------+

    |  a1|    1|   80|        11|

    |  a2|    1|   78|        11|

    |  a3|    1|   95|        11|

    |  a4|    2|   74|        11|

    |  a5|    2|   92|        11|

    |  a6|    3|   99|        11|

    |  a7|    3|   99|        11|

    |  a8|    3|   45|        11|

    |  a9|    3|   55|        11|

    | a10|    3|   78|        11|

    | a11|    3|  100|        11|

    +----+-----+-----+----------+

     ●示例2

    OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。

    如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。

    开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。

    与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

    下面的 SQL 语句用于显示按照班级分组后每组的人数:

    OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

    spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

    查询结果如下所示:

    +----+-----+-----+----------+                                                   

    |name|class|score|name_count|

    +----+-----+-----+----------+

    |  a1|    1|   80|         3|

    |  a2|    1|   78|         3|

    |  a3|    1|   95|         3|

    |  a6|    3|   99|         6|

    |  a7|    3|   99|         6|

    |  a8|    3|   45|         6|

    |  a9|    3|   55|         6|

    | a10|    3|   78|         6|

    | a11|    3|  100|         6|

    |  a4|    2|   74|         2|

    |  a5|    2|   92|         2|

    +----+-----+-----+----------+

    3.8.3  排序开窗函数

    3.8.3.1   ROW_NUMBER顺序排序

    row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

    注意:

    在排序开窗函数中使用 PARTITION  BY 子句需要放置在ORDER  BY 子句之前。

     ●示例1

    spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

    +----+-----+-----+----+

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    |  a4|    2|   74|   3|

    |  a2|    1|   78|   4|

    | a10|    3|   78|   5|

    |  a1|    1|   80|   6|

    |  a5|    2|   92|   7|

    |  a3|    1|   95|   8|

    |  a6|    3|   99|   9|

    |  a7|    3|   99|  10|

    | a11|    3|  100|  11|

    +----+-----+-----+----+

    spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()

    +----+-----+-----+----+                                                         

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a2|    1|   78|   1|

    |  a1|    1|   80|   2|

    |  a3|    1|   95|   3|

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    | a10|    3|   78|   3|

    |  a6|    3|   99|   4|

    |  a7|    3|   99|   5|

    | a11|    3|  100|   6|

    |  a4|    2|   74|   1|

    |  a5|    2|   92|   2|

    +----+-----+-----+----+

     

    3.8.3.2   RANK跳跃排序

    rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。

    这个函数求出来的排名结果可以并列,并列排名之后的排名将是并列的排名加上并列数

    简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名

    ●示例2

    spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()                                                     

    +----+-----+-----+----+

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    |  a4|    2|   74|   3|

    | a10|    3|   78|   4|

    |  a2|    1|   78|   4|

    |  a1|    1|   80|   6|

    |  a5|    2|   92|   7|

    |  a3|    1|   95|   8|

    |  a6|    3|   99|   9|

    |  a7|    3|   99|   9|

    | a11|    3|  100|  11|

    +----+-----+-----+----+

    spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()

    +----+-----+-----+----+                                                         

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a2|    1|   78|   1|

    |  a1|    1|   80|   2|

    |  a3|    1|   95|   3|

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    | a10|    3|   78|   3|

    |  a6|    3|   99|   4|

    |  a7|    3|   99|   4|

    | a11|    3|  100|   6|

    |  a4|    2|   74|   1|

    |  a5|    2|   92|   2|

    +----+-----+-----+----+

     

     

    3.8.3.3   DENSE_RANK连续排序

    dense_rank() over(order by  score) as  dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。

    这个函数并列排名之后的排名只是并列排名加1

    简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名

    ●示例3

    spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()

    +----+-----+-----+----+

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    |  a4|    2|   74|   3|

    |  a2|    1|   78|   4|

    | a10|    3|   78|   4|

    |  a1|    1|   80|   5|

    |  a5|    2|   92|   6|

    |  a3|    1|   95|   7|

    |  a6|    3|   99|   8|

    |  a7|    3|   99|   8|

    | a11|    3|  100|   9|

    +----+-----+-----+----+

    spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()

    +----+-----+-----+----+                                                         

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a2|    1|   78|   1|

    |  a1|    1|   80|   2|

    |  a3|    1|   95|   3|

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    | a10|    3|   78|   3|

    |  a6|    3|   99|   4|

    |  a7|    3|   99|   4|

    | a11|    3|  100|   5|

    |  a4|    2|   74|   1|

    |  a5|    2|   92|   2|

    +----+-----+-----+----+

     

    3.8.3.4   NTILE分组排名[了解]

     

     

    ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

     ●示例4

    spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()

    +----+-----+-----+----+

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   1|

    |  a4|    2|   74|   2|

    |  a2|    1|   78|   2|

    | a10|    3|   78|   3|

    |  a1|    1|   80|   3|

    |  a5|    2|   92|   4|

    |  a3|    1|   95|   4|

    |  a6|    3|   99|   5|

    |  a7|    3|   99|   5|

    | a11|    3|  100|   6|

    +----+-----+-----+----+

    spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()

    +----+-----+-----+----+                                                         

    |name|class|score|rank|

    +----+-----+-----+----+

    |  a2|    1|   78|   1|

    |  a1|    1|   80|   2|

    |  a3|    1|   95|   3|

    |  a8|    3|   45|   1|

    |  a9|    3|   55|   2|

    | a10|    3|   78|   3|

    |  a6|    3|   99|   4|

    |  a7|    3|   99|   5|

    | a11|    3|  100|   6|

    |  a4|    2|   74|   1|

    |  a5|    2|   92|   2|

    +----+-----+-----+----+

    3.8.3.5  代码演示

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

    /**
     * Author itcast
     * Desc 使用SparkSQL支持的开窗函数/窗口函数完成对各个班级的学生成绩的排名
     */
    object RowNumberDemo {

      case class Score(name: String, clazz: Int, score: Int)
      def main(args: Array[String]): Unit = {
        //1.准备环境
        val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._

        //2.加载数据
        val scoreDF: DataFrame = sc.makeRDD(Array(

          Score("a1", 1, 80),
          Score("a2", 1, 78),
          Score("a3", 1, 95),
          Score("a4", 2, 74),
          Score("a5", 2, 92),
          Score("a6", 3, 99),
          Score("a7", 3, 99),
          Score("a8", 3, 45),
          Score("a9", 3, 55),
          Score("a10", 3, 78),
          Score("a11", 3, 100))
        ).toDF("name", "class", "score")
        scoreDF.createOrReplaceTempView("t_scores")
        scoreDF.show()
        /*
        +----+-----+-----+
        |name|class|score|num
        +----+-----+-----+
        |  a1|    1|   80|
        |  a2|    1|   78|
        |  a3|    1|   95|
        |  a4|    2|   74|
        |  a5|    2|   92|
        |  a6|    3|   99|
        |  a7|    3|   99|
        |  a8|    3|   45|
        |  a9|    3|   55|
        | a10|    3|   78|
        | a11|    3|  100|
        +----+-----+-----+
         */


        //使用ROW_NUMBER顺序排序
        spark.sql("select name, class, score, row_number() over(partition by class order by score) num from t_scores").show()

        //使用RANK跳跃排序
        spark.sql("select name, class, score, rank() over(partition by class order by score) num from t_scores").show()

        //使用DENSE_RANK连续排序
        spark.sql("select name, class, score, dense_rank() over(partition by class order by score) num from t_scores").show()


        /*
    ROW_NUMBER顺序排序--1234
    +----+-----+-----+---+
    |name|class|score|num|
    +----+-----+-----+---+
    |  a2|    1|   78|  1|
    |  a1|    1|   80|  2|
    |  a3|    1|   95|  3|
    |  a8|    3|   45|  1|
    |  a9|    3|   55|  2|

    | a10|    3|   78|  3|
    |  a6|    3|   99|  4|
    |  a7|    3|   99|  5|
    | a11|    3|  100|  6|

    |  a4|    2|   74|  1|
    |  a5|    2|   92|  2|
    +----+-----+-----+---+

    使用RANK跳跃排序--1224
    +----+-----+-----+---+
    |name|class|score|num|
    +----+-----+-----+---+
    |  a2|    1|   78|  1|
    |  a1|    1|   80|  2|
    |  a3|    1|   95|  3|
    |  a8|    3|   45|  1|
    |  a9|    3|   55|  2|

    | a10|    3|   78|  3|
    |  a6|    3|   99|  4|
    |  a7|    3|   99|  4|
    | a11|    3|  100|  6|

    |  a4|    2|   74|  1|
    |  a5|    2|   92|  2|
    +----+-----+-----+---+

    DENSE_RANK连续排序--1223
    +----+-----+-----+---+
    |name|class|score|num|
    +----+-----+-----+---+
    |  a2|    1|   78|  1|
    |  a1|    1|   80|  2|
    |  a3|    1|   95|  3|
    |  a8|    3|   45|  1|
    |  a9|    3|   55|  2|

    | a10|    3|   78|  3|
    |  a6|    3|   99|  4|
    |  a7|    3|   99|  4|
    | a11|    3|  100|  5|

    |  a4|    2|   74|  1|
    |  a5|    2|   92|  2|
    +----+-----+-----+---+
         */

        /*
        
        val sql =
          """
            |select 字段1,字段2,字段n,
            |row_number() over(partition by 字段1 order by 字段2 desc) num
            |from 表名
            |having num <= 3
            |""".stripMargin

        import org.apache.spark.sql.functions._
        df.withColumn(
          "num",
          row_number().over(Window.partitionBy('字段1).orderBy('字段2.desc))
        ).filter('num <= 3).show(false)
        
         */
      }

    }

     

     

    4.  SparkSQL自定义UDF函数

    4.1  函数类型

         无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在org.apache.spark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。

    l 回顾Hive中自定义函数有三种类型:

    1:UDF(User-Defined-Function) 函数

    一对一的关系,输入一个值经过函数以后输出一个值;

    在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

    2:UDAF(User-Defined Aggregation Function) 聚合函数

    多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

    3:UDTF(User-Defined Table-Generating Functions) 函数

    一对多的关系,输入一个值输出多个值(一行变为多行);

    用户自定义生成函数,有点像flatMap;

     

    l 注意:在SparkSQL中,目前仅仅支持UDF函数和UDAF函数

    目前来说Spark 框架各个版本及各种语言对自定义函数的支持:

     

     

     

     

     

    4.2  两种方式

    UDF函数也有DSL和SQL两种方式

     

    1.SQL方式

         使用SparkSession中udf方法定义和注册函数,在SQL中使用,使用如下方式定义:

     

     

     

     

    2.DSL方式

        使用org.apache.sql.functions.udf函数定义和注册函数,在DSL中使用,如下方式:

     

     

     

    4.3  代码演示

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
     * Author itcast
     * Desc 演示SparkSQL-自定义UDF
     */
    object SparkSQLDemo08_UDF {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("hello").master("local[*]").getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        import org.apache.spark.sql.functions._
        import spark.implicits._

        //2.获取数据DF->DS->RDD
        val df: DataFrame = spark.read.text("data/input/udf.txt")

        df.printSchema()
        df.show(false)
        /*
        root
       |-- value: string (nullable = true)

      +-----+
      |value|
      +-----+
      |hello|
      |haha |
      |hehe |
      |xixi |
      +-----+
         */

        //TODO =======SQL风格=======
        //3.自定义UDF:String-->大写
        spark.udf.register("small2big",(value:String)=>{

          value.toUpperCase
        })

        //4.执行查询转换
        df.createOrReplaceTempView("t_words")

        val sql =
          """
            |select value,small2big(value) big_value
            |from t_words
            |""".stripMargin
        spark.sql(sql).show(false)

        //TODO =======DSL风格=======
        //3.自定义UDF:String-->大写


        //4.执行查询转换
        val small2big2 = udf((value:String)=>{

          value.toUpperCase
        })
        df.select('value,small2big2('value).as("big_value2")).show(false)


        //5.关闭资源
        sc.stop()

        spark.stop()
      }
    }

     

    5.  Spark On Hive

    http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

    Spark SQL模块从发展来说,从Apache Hive框架而来,发展历程:Hive(MapReduce)-> Shark (Hive on Spark) -> Spark SQL(SchemaRDD -> DataFrame -> Dataset),

    SparkSQL天然无缝集成Hive,可以加载Hive表数据进行分析。

    5.1  HiveOnSpark和SparkOnHive

    l HiveOnSpark:SparkSql诞生之前的Shark项目使用的,是把Hive的执行引擎换成Spark,剩下的使用Hive的,严重依赖Hive,早就淘汰了没有人用了

    l SparkOnHive:SparkSQL诞生之后,Spark提出的,是仅仅使用Hive的元数据(库/表/字段/位置等信息...),剩下的用SparkSQL的,如:执行引擎,语法解析,物理执行计划,SQL优化

     

     

    5.2  spark-sql集成Hive

    l 本质

    SparkSQL集成Hive本质就是:SparkSQL读取Hive的元数据MetaStore

    l 操作

    1启动Hive的元数据库服务

    hive所在机器node2上启动

    nohup /export/server/hive/bin/hive --service metastore &

    注意:Spark3.0需要Hive2.3.7版本

    2告诉SparkSQL:Hive的元数据库在哪里

    哪一台机器需要使用spark-sql命令行整合hive就把下面的配置放在哪一台

    也可以将hive-site.xml分发到集群中所有Spark的conf目录,此时任意机器启动应用都可以访问Hive表数据。

    cd /export/server/spark/conf/

    vim hive-site.xml

    <?xml version="1.0"?>

    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>

        <property>

          <name>hive.metastore.warehouse.dir</name>

          <value>/user/hive/warehouse</value>

        </property>

        <property>

          <name>hive.metastore.local</name>

          <value>false</value>

        </property>

        <property>

          <name>hive.metastore.uris</name>

          <value>thrift://node2:9083</value>

        </property>

    </configuration>

     

    3使用sparksql操作hive

    /export/server/spark/bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=2

    show databases;

    show tables;

    CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' ';

    LOAD DATA LOCAL INPATH 'file:///root/person.txt' INTO TABLE person;

    show tables;

    select * from person;

     

    vim /root/person.txt

    1 zhangsan 20

    2 lisi 29

    3 wangwu 25

    4 zhaoliu 30

    5 tianqi 35

    6 kobe 40

     

     

     

     

    5.3  Spark代码中集成Hive

    l 操作

    1.开启hive元数据库

    nohup /export/server/hive/bin/hive --service metastore &

     

    2.添加依赖:

    <!--SparkSQL+ Hive依赖-->

    <dependency>

                <groupId>org.apache.spark</groupId>

                <artifactId>spark-hive_2.12</artifactId>

                <version>${spark.version}</version>

    </dependency>

     

    3.在代码中告诉SparkSQL:Hive的元数据服务的配置

     

    l 代码实现:

    package cn.itcast.sql

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession

    /**
     * Author itcast
     * Desc 演示SparkSQL-OnHive的元数据库(语法解析,物理执行计划生成,执行引擎,SQL优化都是用的Spark)
     */
    object SparkSQLDemo09_SparkOnHive {

      def main(args: Array[String]): Unit = {
        //1.准备SparkSQL开发环境
        val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]")

          .config("spark.sql.shuffle.partitions", "4")//默认是200,本地测试给少一点
          .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")//指定Hive数据库在HDFS上的位置
          .config("hive.metastore.uris", "thrift://node2:9083")

          .enableHiveSupport()//开启对hive语法的支持
          .getOrCreate()

        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")

        //2.执行Hive-SQL
        spark.sql("show databases").show(false)

        spark.sql("show tables").show(false)
        spark.sql("CREATE TABLE person2 (id int, name string, age int) row format delimited fields terminated by ' '")
        spark.sql("LOAD DATA LOCAL INPATH 'file:///D:/person.txt' INTO TABLE person2")
        spark.sql("show tables").show(false)
        spark.sql("select * from person2").show(false)

        //5.关闭资源
        sc.stop()

        spark.stop()
      }
    }

     

    6.  分布式SQL引擎

    http://spark.apache.org/docs/latest/sql-distributed-sql-engine.html

    6.1  说明

    6.1.1  Hive的SQL交互方式

    方式1:交互式命令行(CLI)

    n bin/hive,编写SQL语句及DDL语句

    方式2:启动服务HiveServer2(Hive ThriftServer2)

    n 将Hive当做一个服务启动(类似MySQL数据库,启动一个服务),端口为10000

    n 交互式命令行,bin/beeline,CDH 版本HIVE建议使用此种方式,CLI方式过时

    n 2JDBC/ODBC方式,类似MySQL中JDBC/ODBC方式

    6.1.2  SparkSQL交互方式

    SparkSQL模块从Hive框架衍生发展而来,所以Hive提供的所有功能(数据分析交互式方式)都支持

    方式1: 上一章节已经学习了

    n SparkSQL命令行或SparkSQL代码中访问

    方式2: 接下来进行学习

    n 启动sparkSQL的thriftserver使用beeline或使用JDBC协议访问

     

    l 补充 :sparksql的thriftserver

    Spark Thrift Server将Spark Applicaiton当做一个服务运行,提供Beeline客户端和JDBC方式访问,与Hive中HiveServer2服务一样的。

    在实际大数据分析项目中,使用SparkSQL时,往往启动一个ThriftServer服务,分配较多资源(Executor数目和内存、CPU),不同的用户启动beeline客户端连接,编写SQL语句分析数据。

     

     

     

     

     

     

     

    在$SPARK_HOME目录下的sbin目录,有相关的服务启动命令:

     

    6.2  使用beeline 客户端连接

    1. 启动SaprkSQL的thriftserver--类似与Hive的HiveServer2

    node1上执行

    /export/server/spark/sbin/start-thriftserver.sh

    --hiveconf hive.server2.thrift.port=10000

    --hiveconf hive.server2.thrift.bind.host=node1

    --master local[2]

    http://node1:4040/jobs/

    如果需要停止:

    /export/server/spark/sbin/stop-thriftserver.sh

     

    2.使用SparkSQLbeeline客户端命令行连接ThriftServer

    /export/server/spark/bin/beeline

    !connect jdbc:hive2://node1:10000

    root

    123456

     

    3.编写SQL语句执行分析:

    show databases;

    show tables;

    select * from person;

     

    4.WEB UI界面:

    http://node1:4040/jobs/

     

    6.3  JDBC/ODBC 客户端

    https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC

    SparkSQL中提供类似JDBC/ODBC方式,连接Spark ThriftServer服务,执行SQL语句

     

    1.添加Maven依赖库:

    <dependency>

                <groupId>org.apache.spark</groupId>

                <artifactId>spark-hive-thriftserver_2.12</artifactId>

                <version>${spark.version}</version>

    </dependency>

     

    2.代码演示

    package cn.itcast.sql

    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

    /**
     * SparkSQL 启动ThriftServer服务,通过JDBC方式访问数据分析查询
     */
    object SparkSQLDemo10_ThriftJDBC {

      def main(args: Array[String]): Unit = {
        // 1.加载驱动
        Class.forName("org.apache.hive.jdbc.HiveDriver") //看上去像是在使用Hiveserver2,本质上使用SparkThriftServer
        // 2.获取连接Connection
        val conn: Connection = DriverManager.getConnection(

          "jdbc:hive2://node1:10000/default", //看上去像是在使用Hiveserver2,本质上使用SparkThriftServer
          "root",

          "123456"
        )
        // 3.构建查询语句
        val sqlStr: String = """select * from person"""

        val ps: PreparedStatement = conn.prepareStatement(sqlStr)
        // 4.执行查询,获取结果
        val rs: ResultSet = ps.executeQuery()

        // 5.处理查询结果
        while (rs.next()) {

          println(s"id = ${rs.getInt(1)}, name = ${rs.getString(2)}, age = ${rs.getInt(3)}}")
        }
        if (null != rs) rs.close()
        if (null != ps) ps.close()
        if (null != conn) conn.close()
      }
    }

     

     

  • 相关阅读:
    加工零件(Dijkstra)
    尼克的任务(DP)
    挖地雷(记忆化搜索)
    滑雪(DP,记忆化搜索)
    子串和子序列(DP)
    八皇后(DFS)
    打印feign报错日志
    restTemplate 踩坑
    共享全局对象
    获取当月多少天
  • 原文地址:https://www.cnblogs.com/shan13936/p/14234281.html
Copyright © 2011-2022 走看看