zoukankan      html  css  js  c++  java
  • Spark SQL入门之wordcount案例

    Spark SQL 是Spark的核心模块,主要用以对结构化的数据(流数据&批数据)进行处理。Spark SQL依然是建立在RDD之上的ETL工具(数据源到数据仓库的一系列处理过程)。
    学习官网:http://spark.apache.org/docs/latest/sql-programming-guide.html

    一、Spark SQL数据抽象

    • Spark SQL提供了DataFrame和DataSet的数据抽象。
    • DataFrame就是RDD+Schema,可以认为是一张二维表格,劣势在于编译器不进行表格中的字段的类型检查,在运行期进行检查。
    • DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
    • DataFrame=DataSet[Row]。

    二、Spark SQL查询方式

    DataFrame查询方式

    DataFrame支持两种查询方式:一种是DSL风格,另外一种是SQL风格
    (1)、DSL风格:
    需要引入import spark.implicit. _ 这个隐式转换,可以将RDD隐式转换成DataFrame
    (2)、SQL风格:
    a、需要将DataFrame注册成一张表格,如果通过CreateTempView这种方式来创建,那么该表格Session有效,如果通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀global_temp
    b、需要通过sparkSession.sql方法来运行你的SQL语句

    DataSet查询方式

    定义一个DataSet,先定义一个Case类

    Dataset和DataFrame的区别

    Dataset:分布式数据集【数据类型Any】
    DataFrame:分布式数据集【数据类型Row】
    RDD --------> Dataset(加强版的RDD) -------> DataFrame(特殊Dataset【Row】)

    三、入门案例--WordCount

    导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.7</version>
        </dependency>
    </dependencies>
    

    开发应用

    package quickstart
    
    import org.apache.spark.sql.SparkSession
    
    object SparkSQLWordCount {
      def main(args: Array[String]): Unit = {
        //1.构建Spark SQL中核心对象SparkSession
        val spark = SparkSession.builder().appName("wordcount").master("local[*]").getOrCreate()
        //2.通过SparkSession对象构建dataset,或者dataframe
        val rdd = spark.sparkContext.makeRDD(List("Hello Hadoop","Hello Kafka"))
        //rdd转换为ds或者df
        //scala隐式转换
        import spark.implicits._
        val dataset  = rdd.toDS()
        //如:强类型操作(操作的是类型)
        //方法一:
        /**
        dataset
          .flatMap(_.split("\s"))
          .map((_,1))
          //无类型操作
          .groupBy("_1")
          //无类型操作
          .sum("_2")
          .show()
        */
        //方法二:
        val flatMapDS = dataset.flatMap(_.split("\s"))
        flatMapDS.createTempView("t_word")
        //sql语句中,先执行后面的group  by,再执行前面的select
        spark
          .sql("select value as word ,count(value) from t_word group by value")
          .show()
    
        spark.stop()
      }
    }
    //结果
    +------+------------+
    |  word|count(value)|
    +------+------------+
    | Kafka|           1|
    | Hello|           2|
    |Hadoop|           1|
    +------+------------+
  • 相关阅读:
    flock对文件锁定读写操作的问题 简单
    hdu 2899 Strange Fuction(二分)
    hdu 2199 Can you solve this equation? (二分)
    poj 3080 Blue Jeans (KMP)
    poj 2823 Sliding Window (单调队列)
    poj 2001 Shortest Prefixes (trie)
    poj 2503 Babelfish (trie)
    poj 1936 All in All
    hdu 3507 Print Article (DP, Monotone Queue)
    fzu 1894 志愿者选拔 (单调队列)
  • 原文地址:https://www.cnblogs.com/wanpi/p/14961057.html
Copyright © 2011-2022 走看看