zoukankan      html  css  js  c++  java
  • Spark SQL

    Spark SQL 类似于Hive

    一、Spark SQL 基础

    1、什么是Spark SQL

    Spark SQL is Apache Spark’s module for working with structured data.
    Spark SQL 是spark 的一个模块。来处理 结构化 的数据
    不能处理非结构化的数据

    特点:
    1)容易集成
    不需要单独安装

    2)统一的数据访问方式
    结构化数据的类型:JDBC JSon Hive parquer文件 都可以作为Spark SQL 的数据源
    对接多种数据源,且使用方式类似

    3)完全兼容hive
    把Hive中的数据,读取到Spark SQL中运行

    4)支持标准的数据连接
    JDBC

    2、为什么学习Spark SQL

    执行效率比Hive高

    hive 2.x 执行引擎可以使用 Spark

    3、核心概念:表(DataFrame DataSet)

    mysql中的表:表结构、数据
    DataFrame:Schema、RDD(数据)

    DataSet 在spark1.6以后,对DataFrame做了一个封装

    4、创建DataFrame

    (*)测试数据:员工表、部门表
    第一种方式:使用case class
    1)定义Schema
    样本类来定义Schema

    case class 特点:
    可以支持模式匹配,使用case class建立表结构

    7521, WARD, SALESMAN,7698, 1981/2/22, 1250, 500, 30

    case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)

    2)读取文件
    val lines = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))

    3)把每行数据,映射到Emp上
    val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

    4)生成DataFrame
    val df1 = allEmp.toDF

    df1.show

    第二种方式 使用Spark Session
    (1)什么是Spark Session
    Spark session available as ‘spark’.
    2.0以后引入的统一访问方式。可以访问所有的Spark组件

    def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

    (2)使用StructType来创建Schema

    val struct =
    StructType(
    StructField(“a”, IntegerType, true) ::
    StructField(“b”, LongType, false) ::
    StructField(“c”, BooleanType, false) :: Nil)

    case class Emp(
    empno:Int,
    ename:String,
    job:String,
    mgr:Int,
    hiredate:String,
    sal:Int,
    comm:Int,
    deptno:Int)

    —————–分割———————-
    import org.apache.spark.sql.types._

    val myschema = StructType(
    List(
    StructField(“empno”,DataTypes.IntegerType),
    StructField(“ename”,DataTypes.StringType),
    StructField(“job”,DataTypes.StringType),
    StructField(“mgr”,DataTypes.IntegerType),
    StructField(“hiredate”,DataTypes.StringType),
    StructField(“sal”,DataTypes.IntegerType),
    StructField(“comm”,DataTypes.IntegerType),
    StructField(“deptno”,DataTypes.IntegerType)
    ))

    准备数据 RDD[Row]
    import org.apache.spark.sql.Row

    val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

    val df2 = spark.createDataFrame(allEmp,myschema)

    df2.show

    第三种方式
    直接读取一个带格式的文件
    在/root/hd/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources有现成的json代码

    val df3 = spark.read 读文件,默认是Parquet文件
    val df3 = spark.read.json(“/uroot/hd/tmp_files/people.json”)

    df3.show

    val df4 = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)

    df4.show

    5、操作DataFrame

    1)DSL语句
    mybatis Hibernate

    df1.printSchema
    df1.select("ename","sal").show
    df1.select($"ename",$"sal",$"sal"+100).show

    $”sal” 可以看做是一个变量

    查询薪水大于2000的员工
    df1.filter($”sal” > 2000).show

    求每个部门的员工人数
    df1.groupBy($”deptno”).count.show

    相当于select deptno,count(1) from emp group by deptno

    2)SQL语句
    注意:不能直接执行SQL,需要生成一个视图,再执行sql

    scala> df1.create
    createGlobalTempView createOrReplaceTempView createTempView

    一般用到 createOrReplaceTempView createTempView
    视图:类似于表,但不保存数据

    df1.createOrReplaceTempView(“emp”)

    操作:
    spark.sql(“select * from emp”).show

    查询薪水大于2000的员工
    spark.sql(“select * from emp where sal > 2000”).show

    求每个部门的员工人数
    spark.sql(“select deptno,count(1) from emp group by deptno”).show

    3)多表查询
    10,ACCOUNTING,NEW YORK

    case class Dept(deptno:Int,dname:String,loc:String)
    val lines = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
    val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

    df5.createOrReplaceTempView(“dept”)

    spark.sql(“select dname,ename from emp,dept where emp.deptno=dept.deptno”).show

    6、操作DataSet

    Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

    Dataset跟DataFrame类似,是一套新的接口,是高级的Dataframe

    举例:
    1)创建DataSet
    (1)使用序列来创建DataSet
    定义一个case class
    case class MyData(a:Int,b:String)

    生成序列,并创建DataSet
    val ds = Seq(MyData(1,”Tom”),MyData(2,”Merry”)).toDS

    .toDS 生成DataSet

    ds.show

    (2)使用JSON数据来创建DataSet

    定义case class
    case class Person(name:String,age:BigInt)

    通过Json数据来生成DataFrame
    val df = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)

    将DataFrame转换成DataSet
    df.as[Person].show

    df.as[Person] 就是一个DataSet

    (3)使用其他数据
    RDD操作和DataFrame操作相结合 —> DataSet

    读取数据,创建DataSet
    val linesDS = spark.read.text(“/root/hd/tmp_files/test_WordCount.txt”).as[String]

    对DataSet进行操作:
    val words = linesDS.flatMap(.split(” “)).filter(.length > 3)

    words.show
    words.collect

    执行一个WordCount程序
    val result = linesDS.flatMap(.split(” “)).map((,1)).groupByKey( x => x._1).count
    result.show

    排序:

    result.orderBy($"value").show
    result.orderBy($"count(1)").show

    2)DataSet操作案例
    使用emp.json 生成一个DataFrame
    val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)

    查询工资大于3000的员工
    empDF.where($”sal” >= 3000).show

    创建case class

    case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)

    生成DataSet
    val empDS = empDF.as[Emp]

    查询工资大于3000的员工
    empDS.filter(_.sal > 3000).show

    查询10号部门的员工
    empDS.filter(_.deptno == 10).show

    3)多表查询
    (1)创建部门表
    val deptRDD = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
    case class Dept(deptno:Int,dname:String,loc:String)

    val deptDS = deptRDD.map( x=> Dept(x(0).toInt,x(1),x(2))).toDS

    (2)创建员工表
    case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
    val empRDD = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))

    7369,SMITH,CLERK,7902,1980/12/17,800,0,20
    val empDS = empRDD.map(x=> Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt)).toDS

    (3)执行多表查询:等值连接
    val result = deptDS.join(empDS,”deptno”)
    result.show
    result.printSchema

    val result1 = deptDS.joinWith(empDS, deptDS(“deptno”) === empDS(“deptno”) )
    result1.show
    result1.printSchema

    join 和 joinWith 区别:连接后schema不同

    join :将两张表展开成一张更大的表
    joinWith :把两张表的数据分别做成一列,然后直接拼在一起

    4)多表连接后再筛选
    deptDS.join(empDS,”deptno”).where(“deptno == 10”).show

    result.explain:执行计划

    7、Spark SQL 中的视图

    视图是一个虚表,不存储数据
    两种类型:
    1)普通视图(本地视图)
    只在当前Session中有效createOrReplaceTempView createTempView

    2)全局视图
    createGlobalTempView
    在不同的Session中都有用,把全局视图创建在命名空间中:global_temp中。类似于一个库

    scala> df1.create
    createGlobalTempView createOrReplaceTempView createTempView

    举例:
    创建一个新session,读取不到emp视图,报错
    df1.createOrReplaceTempView(“emp”)
    spark.sql(“select * from emp”).show
    spark.newSession.sql(“select * from emp”)

    以下两种方式均可读到全局视图中的数据
    df1.createGlobalTempView(“emp1”)

    spark.newSession.sql(“select * from global_temp.emp1”).show

    spark.sql(“select * from global_temp.emp1”).show

    二、使用数据源

    在Spark SQL中,可以使用各种各样的数据源来操作。 结构化

    1、使用load函数、save函数

    load函数是加载数据,save是存储数据

    注意:使用load 或 save时,默认是Parquet文件。列式存储文件

    举例:
    读取 users.parquet 文件
    val userDF = spark.read.load(“/root/hd/tmp_files/users.parquet”)

    userDF.printSchema
    userDF.show

    val userDF = spark.read.load(“/root/hd/tmp_files/emp.json”)

    保存parquet文件

    userDF.select($"name",$"favorite_color").write.save("/root/hd/tmp_files/parquet")

    读取刚刚写入的文件:
    val userDF1 = spark.read.load(“/root/hd/tmp_files/parquet/part-00000-f9a3d6bb-d481-4fc9-abf6-5f20139f97c5.snappy.parquet”)—> 不推荐

    生产中直接读取存放的目录即可:
    val userDF2 = spark.read.load(“/root/hd/tmp_files/parquet”)

    读json文件 必须format
    val userDF = spark.read.format(“json”).load(“/root/hd/tmp_files/emp.json”)
    val userDF3 = spark.read.json(“/root/hd/tmp_files/emp.json”)

    关于save函数
    调用save函数的时候,可以指定存储模式,追加、覆盖等等
    userDF.write.save(“/root/hd/tmp_files/parquet”)

    userDF.write.save(“/root/hd/tmp_files/parquet”)
    org.apache.spark.sql.AnalysisException: path file:/root/hd/tmp_files/parquet already exists.;

    save的时候覆盖
    userDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)

    将结果保存成表
    userDF.select($”name”).write.saveAsTable(“table1”)

    scala> userDF.select($”name”).write.saveAsTable(“table1”)

    scala> spark.sql(“select * from table1”).show
    +——+
    | name|
    +——+
    |Alyssa|
    | Ben|
    +——+

    2、Parquet文件

    列式存储文件,是Spark SQL 默认的数据源
    就是一个普通的文件

    举例:
    1)把其他文件,转换成Parquet文件
    调用save函数
    把数据读进来,再写出去,就是Parquet文件

    val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)
    empDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)
    empDF.write.mode(“overwrite”).parquet(“/root/hd/tmp_files/parquet”)

    val emp1 = spark.read.parquet(“/root/hd/tmp_files/parquet”)
    emp1.createOrReplaceTempView(“emp1”)
    spark.sql(“select * from emp1”)

    2)支持Schema的合并
    项目开始 表结构简单 schema简单
    项目越来越大 schema越来越复杂

    举例:
    通过RDD来创建DataFrame
    val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
    “single”,”double” 是表结构
    df1.show

    df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=1”)

    val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
    df2.show
    df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=2”)

    合并两个部分
    val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)

    val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)

    key是可以随意取名字的,两个key需要一致,不然合并会报错

    通过RDD来创建DataFrame
    val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
    “single”,”double” 是表结构
    df1.show

    df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=1”)

    val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
    df2.show
    df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=2”)

    合并两个部分
    val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)

    val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)

    3、json文件

    读取Json文件,生成DataFrame
    val peopleDF = spark.read.json(“/root/hd/tmp_files/people.json”)

    peopleDF.printSchema

    peopleDF.createOrReplaceTempView(“peopleView”)

    spark.sql(“select * from peopleView”).show

    Spark SQL 支持统一的访问接口。对于不同的数据源,读取进来,生成DataFrame后,操作完全一样

    4、JDBC

    使用JDBC操作关系型数据库,加载到Spark中进行分析和处理

    方式一:

    ./spark-shell --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar
    val mysqlDF = spark.read.format("jdbc")
    .option("url","jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("user","root")
    .option("password","123456")
    .option("dbtable","emp").load
    val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","123456").option("dbtable","emp").load

    mysqlDF.show

    问题解决:
    如果遇到下面问题,就是你本机的mysql数据库没有权限给你虚拟机访问
    java.sql.SQLException: null, message from server: “Host ‘hsiehchou121’ is not allowed to connect to this MySQL server”

    解决方案

    1)进入你本机的数据库
    mysql -u root -p
    2)use mysql;
    3)修改root用户前面的Host,改为%,意思是全部IP都能访问
    4)flush privileges;

    方式二:
    定义一个Properties类

    import java.util.Properties
    val mysqlProps = new Properties()
    mysqlProps.setProperty("driver","com.mysql.cj.jdbc.Driver")
    mysqlProps.setProperty("user","root")
    mysqlProps.setProperty("password","123456")
    val mysqlDF1 = spark.read.jdbc("jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)
    mysqlDF1.show

    5、使用Hive

    比较常见
    (*)spark SQL 完全兼容hive
    (*)需要进行配置
    拷贝一下文件到spark/conf目录下:
    Hive 配置文件: hive-site.xml
    Hadoop 配置文件:core-site.xml hdfs-site.xml

    配置好后,重启spark

    在hive的lib下和spark的jars下面增加mysql-connector-java-8.0.12.jar这边连接数据库的jar包

    启动Hadoop :start-all.sh
    启动 hive:

    hsiehchou121
    cd hive/bin/
    ./hive --service metastore
    hsiehchou122
    cd hive/bin
    ./hive

    hsiehchou121启动问题
    java.sql.SQLSyntaxErrorException: Table ‘hive.version’ doesn’t exist
    解决:去mysql数据库中的hive库下面创建version表
    这里需要给本地的hive库创建下hive所必须用的表

    我们去/root/hd/hive/scripts/metastore/upgrade/mysql这里面找到hive-schema-1.2.0.mysql.sql,将里面的sql语句在hive库中执行

    hive-txn-schema-0.14.0.mysql.sql,这个也做好执行下,用于事务管理

    显示当前所在库名字
    set hive.cli.print.current.db=true;

    j将emp.csv上传到hdfs中的/tmp_files/下面
    hdfs dfs -put emp.csv /tmp_files

    在hive中创建emp_default表

    hive (default)> create table emp(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int)
    > row format
    > delimited fields
    > terminated by ",";
    hive (default)> load data inpath '/tmp_files/emp.csv' into table emp;
    Time taken: 1.894 seconds
    hive (default)> show tables;
    hive (default)> select * from emp;

    hdfs dfs -put /root/hd/tmp_files/emp.csv /tmp_files

    [root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077

    启动spatk时,如果出现如下错误
    java.sql.SQLSyntaxErrorException: Table ‘hive.partitions’ doesn’t exist
    在mysql数据库里面创建partitions表

    scala> spark.sql(“select * from emp_default”).show
    scala> spark.sql(“select * from default.emp_default”).show

    spark.sql(“create table company.emp_4(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ‘,’”)
    spark.sql(“load data local inpath ‘/root/hd/tmp_files/emp.csv’ overwrite into table company.emp_4”)

    三、在IDE中开发Spark SQL

    1、创建DataFrame StructType方式

    package day4
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.IntegerType
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.Row
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    /**
    * 创建DataFrame StructType方式
    */
    object Demo1 {
    def main(args: Array[String]): Unit = {
    //减少Info日志的打印
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //创建Spark Session对象
    val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
    //从指定的地址创建RDD对象
    val personRDD = spark.sparkContext.textFile("H:\other\students.txt").map(_.split(" "))
    //通过StructType方式指定Schema
    val schema = StructType(
    List(
    StructField("id", IntegerType),
    StructField("name", StringType),
    StructField("age", IntegerType)))
    //将RDD映射到rowRDD上,映射到Schema上
    val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
    val personDataFrame = spark.createDataFrame(rowRDD, schema)
    //注册视图
    personDataFrame.createOrReplaceTempView("t_person")
    //执行SQL语句 desc降序 asc 升序
    val df = spark.sql("select * from t_person order by age desc")
    df.show
    spark.stop()
    }
    }

    2、使用case class来创建DataFrame

    package day4
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import org.apache.spark.sql.SparkSession
    /**
    * 使用case class来创建DataFrame
    */
    object Demo2 {
    def main(args: Array[String]): Unit = {
    //减少Info日志的打印
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //创建Spark Session对象
    val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
    //从指定的地址创建RDD对象
    val lineRDD = spark.sparkContext.textFile("H:\other\students.txt").map(_.split(" "))
    //把数据与case class做匹配
    val studentRDD = lineRDD.map(x => Student(x(0).toInt,x(1),x(2).toInt))
    //生成DataFrame
    import spark.sqlContext.implicits._
    val studentDF = studentRDD.toDF()
    //注册视图,执行SQL
    studentDF.createOrReplaceTempView("student")
    spark.sql("select * from student").show
    spark.stop()
    }
    }
    //定义case class
    case class Student(stuId:Int, stuName:String, stuAge:Int)

    3、写入mysql

    package day4
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.IntegerType
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructField
    import java.util.Properties
    /**
    * 写入mysql
    */
    object Demo3 {
    def main(args: Array[String]): Unit = {
    //减少Info日志的打印
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //创建Spark Session对象
    val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
    //从指定的地址创建RDD对象
    val lineRDD = spark.sparkContext.textFile("H:\other\students.txt").map(_.split(" "))
    //通过StructType方式指定Schema
    val schema = StructType(
    List(
    StructField("personID", IntegerType),
    StructField("personName", StringType),
    StructField("personAge", IntegerType)))
    //将RDD映射到rowRDD上,映射到Schema上
    val rowRDD = lineRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
    val personDataFrame = spark.createDataFrame(rowRDD, schema)
    personDataFrame.createOrReplaceTempView("myperson")
    val result = spark.sql("select * from myperson")
    result.show
    //把结果存入mysql中
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123456")
    props.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    result.write.mode("append").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
    spark.stop()
    }
    }

    4、使用Spark SQL 读取Hive中的数据,将计算结果存入mysql

    package day4
    import org.apache.spark.sql.SparkSession
    import java.util.Properties
    /**
    * 使用Spark SQL 读取Hive中的数据,将计算结果存入mysql
    */
    object Demo4 {
    def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession.builder().appName("Demo4").enableHiveSupport().getOrCreate()
    //执行SQL
    val result = spark.sql("select deptno,count(1) from company.emp group by deptno")
    //将结果保存到mysql中
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123456")
    props.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    result.write.jdbc("jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "emp_stat", props)
    spark.stop()
    }
    }

    提交任务

    [root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --class day4.Demo4 /root/hd/tmp_files/Demo4.jar

    四、性能优化

    与RDD类似

    1、把内存中缓存表的数据

    直接读取内存的值,来提高性能

    RDD中如何缓存:
    rdd.cache 或者 rdd.persist

    在Spark SQL中,使用SparkSession.sqlContext.cacheTable

    spark中所有context对象
    1)sparkContext : SparkCore
    2)sql Context : SparkSQL
    3)Streaming Context :SparkStreaming

    统一起来:SparkSession

    操作mysql,启动spark shell 时,需要:
    ./spark-shell –master spark://hsiehchou121:7077 –jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar –driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar

    val mysqlDF = spark.read.format(“jdbc”).option(“driver”,”com.mysql.cj.jdbc.Driver”).option(“url”,”jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8”).option(“user”,”root”).option(“password”,”123456”).option(“dbtable”,”emp”).load

    mysqlDF.show
    mysqlDF.createOrReplaceTempView(“emp”)

    spark.sqlContext.cacheTable(“emp”) —-> 标识这张表可以被缓存,数据还没有真正被缓存
    spark.sql(“select * from emp”).show —-> 依然读取mysql
    spark.sql(“select * from emp”).show —-> 从缓存中读取数据

    spark.sqlContext.clearCache

    清空缓存后,执行查询,会触发查询mysql数据库

    2、了解性能优化的相关参数

    将数据缓存到内存中的相关优化参数
    spark.sql.inMemoryColumnarStorage.compressed
    默认为 true
    Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式

    spark.sql.inMemoryColumnarStorage.batchSize
    默认值:10000
    缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险

    其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
    spark.sql.files.maxPartitionBytes
    默认值:128 MB
    读取文件时单个分区可容纳的最大字节数

    spark.sql.files.openCostInBytes
    默认值:4M
    打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)

    spark.sql.autoBroadcastJoinThreshold
    默认值:10M
    用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表

    spark.sql.shuffle.partitions
    默认值:200
    用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数

  • 相关阅读:
    Magento 安装时文件权限 设置
    进度十(10.28)
    进度九(10.27)
    进度八(10.26)
    进度六(10.24)
    进度五(10.23)
    进度四(10.22)
    进度三(10.21)
    进度二(10.20)
    进度一(10.19)
  • 原文地址:https://www.cnblogs.com/hsiehchou/p/10698223.html
Copyright © 2011-2022 走看看