zoukankan      html  css  js  c++  java
  • 大数据实践(十一)SparkSQL模块基础

    SparkSQL是Spark的一个子模块,主要用于操作结构化数据,借鉴了Hive。

    此前使用的是SparkCore模块的RDD结构进行数据处理,SparkSQL提供了结构化的数据结构DataFrame、DataSet。

    SparkSQL支持SQL、DSL(domain-specific language)两种方式、多种语言(Scala、Java、Python、R)进行开发,最后底层都转换为RDD.

    SparkSQL支持多种数据源(Hive,Avro,Parquet,ORC,JSON 和 JDBC 等)、支持 HiveQL 语法以及 Hive SerDes 和 UDF,允许你访问现有的 Hive 仓库.

    零、SparkSession、IDEA整合Spark

    SparkSession:这是一个新入口,取代了原本的SQLContext与HiveContext。

    SparkContext也可以通过SparkSession获得。

    1、SparkSession

    交互式环境下启动spark后,自带一个变量spark

    Spark context available as 'sc' (master = local[*], app id = local-1608185209816).
    Spark session available as 'spark'.
    
    #读取文件
    scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
    text: org.apache.spark.sql.Dataset[String] = [value: string]  
    
    scala> text.show()
    +--------------------+                                                          
    |               value|
    +--------------------+
    |java hadoop spark...|
    |spark scala spark...|
    |    scala spark hive|
    +--------------------+
    
    

    在文件程序中,使用builder方法获取,本身没有公开的相关构造器可以被使用。

    val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()
    

    通过SparkSession入口可以进行数据加载、保存、执行SQL、以及获得其他入口(sqlContext、SparkContext)

    2、使用IDEA开发Spark

    Spark使用本地开发,将其集成到IDEA中,首先需要有scalasdk,版本要和操作的spark适配。

    其次IDEA需要已经安装过scala开发插件。

    接下来在maven项目配置文件中导入以下依赖地址即可。

     	<!-- SparkCore,基础模块 -->
    	<dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.0.0-preview2</version>
            </dependency>
    
            <!-- SparkSQL,SQL模块 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.0.0-preview2</version>
            </dependency>
    
    
            <!-- Spark和hive集成的模块,可用于处理hive中的数据 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.12</artifactId>
                <version>3.0.0-preview2</version>
                <scope>provided</scope>
            </dependency>
    <!--mysql驱动,与Mysql交互需要-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.22</version>
            </dependency>
    

    如果日志太多碍眼,可以调高日志等级,第一行修改为ERROR,在resources目录下使用log4j.properties文件覆盖默认日志配置文件。

    #hadoop.root.logger=warn,console
    # Set everything to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    # Set the default spark-shell log level to WARN. When running the spark-shell, the
    # log level for this class is used to overwrite the root logger's log level, so that
    # the user can have different defaults for the shell and regular Spark apps.
    log4j.logger.org.apache.spark.repl.Main=WARN
    # Settings to quiet third party logs that are too verbose
    log4j.logger.org.spark_project.jetty=WARN
    log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
    # Parquet related logging
    log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
    log4j.logger.parquet.CorruptStatistics=ERROR
    

    一、RDD、DataFrame、DataSet

    RDD作为SparkCore的数据结构,可以处理结构化非结构化的数据,但效率较低。

    SparkSQL中封装了对结构化数据处理效果更好的DataFrame、DateSet.

    RDD以行为单位,对行有数据类型。

    DataFrame来自pandas库的数据结构,相比于RDD提供了字段类型,DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。

    Dataset 也是分布式的数据集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame
    的优点,具备强类型的特点,同时支持 Lambda 函数,但只能在 Scala 和 Java 语言中使用。

    三者可以互相转换。

    对文本文件返回了 Dataset[String]

    scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
    text: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> text.show()
    +--------------------+                                                          
    |               value|
    +--------------------+
    |java hadoop spark...|
    |spark scala spark...|
    |    scala spark hive|
    +--------------------+
    
    
    #结构
    scala> text.printSchema
    root
     |-- value: string (nullable = true)
    
    #返回一个new session
    scala> val nsp = spark.newSession
    nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b
    
    #读取json默认返回DF
    scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json")
    user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
    
    #定义一个样例类,作为数据类型
    scala> case class User(id :Long,username:String,email:String){}
    defined class User
    
    #加入数据类型,返回ds
    scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User]
    userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]
    
    

    内部结构

    scala> userds.printSchema
    root
     |-- email: string (nullable = true)
     |-- id: long (nullable = true)
     |-- username: string (nullable = true)
    
    

    互相转换:

    scala> val udf = user.rdd
    udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25
    
    scala> userds.toDF
    res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
    

    二、SQL、DSL

    SparkSQL可以使用SQL编程,也可以基于特定语言,调用api进行编程。

    1、SQL

    使用SQL编程有两步:

    ​ 1、创建临时视图。(有表或视图才能执行sql,spark可以创建临时视图)

    ​ 2、编写sql语句。

    #创建视图users
    scala> userds.createOrReplaceTempView("users")
    
    #编写语句
    scala> nsp.sql("select id,username from users order by id desc").show
    +---+--------+
    | id|username|
    +---+--------+
    |  3|     Jim|
    |  2|    jack|
    |  1|    Bret|
    +---+--------+
    
    /*
    createGlobalTempView            createOrReplaceTempView   crossJoin   
    createOrReplaceGlobalTempView   createTempView 
    */
    
    

    如果在文件程序中编写,需要导入以下隐式转换。

        import spark.implicits._ #spark是变量名
        import org.apache.spark.sql.functions._
    
    2、DSL

    DSL是特定语言,如scala、java等。

    调用api进行处理。

    #scala DSL编程
    scala> userds.select('id,'username).orderBy(desc("id")).show()
    +---+--------+
    | id|username|
    +---+--------+
    |  3|     Jim|
    |  2|    jack|
    |  1|    Bret|
    +---+--------+
    
    

    使用Java进行编程。

     final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate();
    
            Dataset<Row> dataset = spark.read().json("spark/data/user3.json");
    
            try{
                dataset.createTempView("users");
    			//SQL
                spark.sql("select id,username from users order by id desc").show();
            }catch (Exception e){
                e.printStackTrace();
            }
    		
    		//DSL
    		dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show();
    		
            spark.close();
    

    三、UDF

    UDF:Spark和Hive一样,允许用户自定义函数来补充功能。

    SQL方式和DSL方式都支持。

    1、SQL方式

    #注册函数
    scala> nsp.udf.register("plus0",(id:String)=>0+id)
    res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true)
    
    scala> nsp.sql("select plus0(id),username from users order by id desc").show
    +---------+--------+
    |plus0(id)|username|
    +---------+--------+
    |       03|     Jim|
    |       02|    jack|
    |       01|    Bret|
    +---------+--------+
    
    #注册函数
    scala> nsp.udf.register("plus1",(id:Int)=>1+id)
    res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true)
    
    scala> nsp.sql("select plus1(id),username from users order by id desc").show
    +---------+--------+
    |plus1(id)|username|
    +---------+--------+
    |        4|     Jim|
    |        3|    jack|
    |        2|    Bret|
    +---------+--------+
    
    
    

    2、DSL方式

                           
    #定义函数
    scala> val prefix_name =udf(
         | (name:String)=>{ "user: "+name})
    
    prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true)
    
    #套入字段名
    scala> userds.select($"id",prefix_name('username).as("newname")).orderBy(desc("id")).show()
    +---+----------+
    | id|   newname|
    +---+----------+
    |  3| user: Jim|
    |  2|user: jack|
    |  1|user: Bret|
    +---+----------+
    
    

    Java编写

    	#接收参数名和一个函数接口
    spark.udf().register("prefix_name", new UDF1<String, String>() {
    
               @Override
               public String call(String s) throws Exception {
                   return "user: "+s;
               }
           }, DataTypes.StringType);
    
    
            try{
                dataset.createTempView("users");
    	#和SQL一样使用
                spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show();
            }catch (Exception e){
                e.printStackTrace();
            }
    
    +---+----------+
    | id|  new_name|
    +---+----------+
    |  3| user: Jim|
    |  2|user: jack|
    |  1|user: Bret|
    +---+----------+
    
    

    四、Spark on Hive

    SparkSQL集成Hive,加载读取Hive表数据进行分析,称之为Spark on Hive;

    Hive 框架底层分区引擎,可以将MapReduce改为Spark,称之为Hive on Spark;

    Spark on Hive 相当于Spark使用Hive的数据。

    Hive on Spark相当于Hive把自身的计算从MR换成SparkRDD.

    可以通过以下方式进行Spark on Hive.

    1、Sparksql

    spark-sql是spark提供的一种交互模式,使用sql语句进行处理,默认就是使用hive中的数据。

    需要将mysql驱动放到jar目录下,或者指定驱动。

    spark-sql --master local[*]
    
    #指定驱动
    spark-sql  --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    
    

    2、在IDEA中使用spark on hive

    添加spark操作hive的依赖。

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>
    

    core-site.xml,hdfs-site.xml,hive-site.xml都拷贝到Rsource目录下.

    可以使用Spark 操作Hive数据了。

     	val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]")
        val spark = SparkSession.builder().config(conf)
    			 .enableHiveSupport()//!!!!默认不支持外部hive,这里需调用方法支持外部hive
    			.getOrCreate()
    
        import spark.implicits._
    
    
        spark.sql("use spark_sql_hive") 
    
    
        spark.sql(
          """
            |select t.id,t.name,t.email from
            |(select cast('id' as INT) ,id ,name,email from user where id >1001) t
          """.stripMargin).show()
    
  • 相关阅读:
    使用TFS CI 又想保留服务运行状态的简单方法
    【知乎】二战中日本陆军的建制
    缓存你的BITMAP对象
    Android 多种方式正确的加载图像,有效避免oom
    GitHub进一步了解
    响应式编程,是明智的选择
    Android 主题动态切换框架:Prism
    Android Fragment使用(二) 嵌套Fragments (Nested Fragments) 的使用及常见错误
    ClassLoader工作机制
    Java——泛型(最易懂的方式讲解泛型)
  • 原文地址:https://www.cnblogs.com/cgl-dong/p/14172475.html
Copyright © 2011-2022 走看看