zoukankan      html  css  js  c++  java
  • 【Spark深入学习 -16】官网学习SparkSQL

    ----本节内容-------
    1.概览
            1.1 Spark SQL
            1.2 DatSets和DataFrame
    2.动手干活
            2.1 契入点:SparkSession
            2.2 创建DataFrames
            2.3 非强类型结果集操作
            2.4 程序化执行SQL查询
            2.5 全局临时视图
            2.6 创建DataSets
            2.7 与RDD交互操作
            2.8 聚集函数
    3.Spark数据源
            3.1 通用Load/Save函数
            3.2 Parquets文件格式
                3.2.1 读取Parquet文件
                3.2.2 解析分区信息
                3.2.3 Schema合并
                3.2.4 Hive元与Parquet表转换
            3.3 JSON数据集
            3.4 Hive表
            3.5 JDBC连接其他库
    4 性能调优
            4.1 缓存数据至内存
            4.2 调优参数
    5 分布式SQL引擎
            5.1 运行Thrift JDBC/ODBC服务
            5.2 运行Spark SQL CLI
    6.参考资料
    ---------------------

    最近好几个好友和Spark官方文档杠上了,准备共同整理一下Spark官方文档,互相分享研究心得故有此一篇。本次参考官网的说明,着重介绍SparkSQL,结合官网提供的重要内容以及自己的理解做一次学习笔记,主要是针对spark2.0的官方文档,本文不是对官网文档的翻译,但是主要参考内容来自官方文档。

    1.概览

    Spark SQL是Spark的一个组件,能够很好的处理结构化数据。Spark SQL记录了更多数据结构化信息, 所以相比RDD,可以更好的处理结构化数据,并且具有更好的性能(Spark SQL都记录了啥信息这么能干,举个简单的例子,一般的数据库读表中某个字段的数据,先拿到字段内容,然后还要去元数据表获得这个字段表示什么含义,来来回回的查,效率低,Spark SQL就不需要这样,他一拿到字段内容就知道是什么意思,因为他记录了字段的含义,不需要去查元数据表来感知这个字段什么含义,效率就提升了)。可以使用 SQL或者DataSet与Spark SQL进行交互。不管你使用Java、Scala还是python语言,Spark SQL底层计算引擎都是一样的,所以支持很多语言开发,随你挑,随你用什么语言开发。
    1.1 Spark SQL
    Spark SQL无缝集成Hive的sql语法,只需要做一些简单的配置,怎么配置,自行百度。通过SQL查询返回的结果是Dataset/DataFrame,也支持命令行或者JDBC的方式连接Spark SQL.
    1.2 DatSets和DataFrame
    官网真的很啰嗦,唠唠叨叨的,这一段其实就讲了这么几个要点
    1)DataSet在spark1.6开始支持
    2)DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame可以理解为关系数据库中的一张表,也可以理解为R/Python中的一个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程中生成的RDD等。
    3)DataFrame的API支持4种语言:Scala、Java、Python、R。但是对python和R支持的不是很好,对Java和scala支持很好,


    2.动手干活

    本例子演示如何创建SparkSession,创建DataFrames,操作Dataset 
    启动Spark shell,
    命令: bin/spark-shell --master spark://master01:7077
    ----------------------
    import org.apache.spark.sql.SparkSession
    import spark.implicits._
    val spark=SparkSession.builder().appName("test").config("spark.some.config.option","some-value").getOrCreate();
    val df = spark.read.json("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json");
    df.show()
    df.printSchema()
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
    df.filter($"age" > 21).show()
    df.groupBy("age").count().show()
    ----------------------
    2.1 契入点:SparkSession
    入口点是SparkSession类,使用SparkSession.builder()可以获得SparkSession对象
    2.2 创建DataFrames
    使用创建好的sparkSession来创建DataFrames,DataFrame可以来自RDD,或者Spark数据源
    如hive,或则其他数据源,下面这个例子是读取json数据,数据源格式,people.json在spark自带文件夹内examples/src/main/resources/people.json
    --------------------------
    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    --------------------------
    2.3 非强类型结果集操作
    DataFrame在Scala、Java、Python和R中为结构化数据操作提供了一个特定领域语言支持。在Spark2.0中,在Scala和Java的API中,DataFrame仅仅是Dataset的RowS表示。与Scala/Java中的强类型的“带类型转换操作”相比,这些操作也可以看做“无类型转换操作”。
    打印出来表结构
    打印列名为name的所有列内容

    选择name和age列,并且age都加1
    过滤出age大于21的记录

    根据年龄分组,并统计个数
     

    除了简单的列引用和表达式外,Dataset同时有丰富的函数库,包括字符串操作、日期算法、常用数学操作等。完整的列表可参考DataFrame Function Reference。
    2.4 程序化执行SQL查询
     Sparksession中的sql函数使得应用可以编程式执行SQL查询语句并且已DataFrame形式返回:
     
    2.5 全局临时视图
    临时视图是基于session级别的,创建视图的session一旦挂掉临时视图的生命也就到此为止了,使用全局视图,可以避免这样的惨剧发生。
    df.createGlobalTempView("people")
    spark.sql("SELECT * FROM global_temp.people").show()
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
     
    2.6 创建DataSets
    Dataset与RDD很像,不同的是它并不使用Java序列化或者Kryo,而是使用特殊的编码器来为网络间的处理或传输的对象进行序列化。对转换一个对象为字节的过程来说编码器和标准系列化器都是可靠的,编码器的代码是自动生成并且使用了一种格式,这种格式允许Spark在不需要将字节解码成对象的情况下执行很多操作,如filtering、sorting和hashing等。
    case class Person(name: String, age: Long)
    //创建一个Person然后转化为DataSet
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()

    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect()

    //换成你本地spark安装路径即可
    val path = "file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show()
    备注说明:
    1)case 类在scala2.1.0最多支持22个字段
    2)编码器默认导入spark.implicits._
    3)通过制定类名,DataFrame可以自动转为DataSet
     
    2.7 与RDD交互操作
    Spark SQL支持两种将已存在的RDD转化为Dataset的方法。第一种方法:使用反射推断包含特定类型对象的RDD的结构。这种基于反射的方法代码更加简洁,并且当你在写Spark程序的时候已经知道RDD的结构的情况下效果很好。第二种方法:创建Dataset的方法是通过编程接口建立一个结构,然后将它应用于一个存在的RDD。虽然这种方法更加繁琐,但它允许你在运行之前不知道其中的列和对应的类型的情况下构建Dataset。
     
    使用反射推断结构
    Spark SQL的Scala接口支持自动的将一个包含case class的RDD转换为DataFrame。这个case class定义了表结构。Caseclass的参数名是通过反射机制读取,然后变成列名。Caseclass可以嵌套或者包含像Seq或Array之类的复杂类型。这个RDD可以隐式的转换为一个DataFrame,然后被注册为一张表。这个表可以随后被SQL的statement使用。
    -----------------------
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder
    import spark.implicits._
    //读取txt文件->切分文件->将切分后的内容作为参数传递给Person类构建对象->转为dataset
    val peopleDF=spark.sparkContext.textFile("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
    //注册一个dataset临时视图
    peopleDF.createOrReplaceTempView("people")
    //使用sql执行标准sql语句,这里的name和age是Person类的成员对象,要保持一致
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 20")
    //使用map进行转换,teenager 是一个数组,根据下标取得数据
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    teenagersDF.map(teenager => "Age: " + teenager(1)).show()
    //根据列名取数
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    //dataset要进行map操作,要先定义一个Encoder,不支持map会给升级带来较大麻烦
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

    -----------people.txt数据-----------

     
    通过编程接口指定Schema
    当JavaBean不能被预先定义的时候(比如不同用户解析同一行,解析结果字段可能就不同),编程创建DataFrame分为三步:
      ● 从原来的RDD创建一个Row格式的RDD
      ● 创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
      ● 通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema
    ----------------------------------
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    val peopleRDD = spark.sparkContext.textFile("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
    //定义一个shema列名,string类型
    val schemaString = "name age"
    //根据schema列名生成schema,通过StructType方式生成schema
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)
    //将RDD记录转为RowS 形式
    val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
    // 创建dataFrame,将schema和文件内容RDD结合在一起了 val peopleDF = spark.createDataFrame(rowRDD, schema)
    //创建临时视图
    peopleDF.createOrReplaceTempView("people")
    //执行sql
    val results = spark.sql("SELECT name FROM people")
    results.map(attributes => "Name: " + attributes(0)).show()

    2.8 聚集函数
    DataFrames内置了常见的聚合函数,如min,max,count,distinct等,都是为DataFrame,用户也可以定义自己的聚集函数
    ------------------
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
     
    object MyAverage extends UserDefinedAggregateFunction {
    def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
    def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
    }
    def dataType: DataType = DoubleType
    def deterministic: Boolean = true
    def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
    }
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    buffer(1) = buffer.getLong(1) + 1
    }
    }
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
    def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble/buffer.getLong(1)
    }
     
    //////////////////////////////////////////////////
    spark.udf.register("myAverage", MyAverage)
    val df = spark.read.json("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/employe.json")
    df.createOrReplaceTempView("employees") df.show()
     
    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show()
     
    备注:
    1)employe.json,这个文件在我的网盘下,spark没有自带该文件
    链接:http://pan.baidu.com/s/1bpqzII7 密码:kuyv
    2)如果你是file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/employe.json方式读取本地文件,你得将employe.json分发到各个节点指定的目录examples/src/main/resources/employe下面,否则会报错。 
     
    3.Spark数据源
    Spark SQL通过DataFrame接口,可以支持对多种数据源的操作。DataFrame可以使用关系转换来进行操作,而且可以用来创建临时视图。将DataFrame注册为临时视图可以允许你在数据上运行SQL查询语句。本节讲解使用SparkData Source加载数据和保存数据的通用方法,然后
    详细讲述内部支持的数据源可用的特定操作。
    3.1 通用Load/Save函数
    Spark默认数据源格式将被用于所有的操作,默认是parquet文件格式,使用spark.sql.sources.default指定默认文件格式
    --------------------
    val usersDF = spark.read.load("/tmp/namesAndAges.parquet") usersDF.select("name", "age").write.save("namesAndAgestest.parquet")
    --------------------
    手动指定文件格式
     你可以手动指定数据源以及数据源附带的额外选项。数据源被他们的完全限定名来指定(如,org.apache.spark.sql.parquet),但对于内部支持的数据源,你可以使用短名(json,parquet,jdbc)。DataFrame可以使用这种语法从任何可以转换为其他类型的数据源加载数据。
    val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("/tmp/namesAndAges.parquet")
    在文件上直接执行SQL
    除了使用读取API加载一个文件到DATAFrame然后查询它的方式,你同样可以通过SQL直接查询文件。
    val sqlDF=spark.sql("SELECT name FROM parquet.`/tmp/namesAndAges.parquet`")
    保存模式
    保存操作可选SaveMode,它指定了如何处理现有的数据。需要重视的一点是这些保存模式没有使用任何的锁,并且不具有原子性。此外,当执行Overwrite时,数据将先被删除,然后写出新数据。
    (1)Overwrite
    如果有文件存在,新内容会覆盖原有内容
    ---------------
    import org.apache.spark.sql.SaveMode
    val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
    //覆盖模式,原来有文件存在会先删除,在写入
    peopleDF.select("name", "age").write.mode(SaveMode.Overwrite).save("/tmp/test1.parquet")
    //读取刚刚写入程序 
    val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
    (2)Append
    如果文件存在,就在原有的文件中追加新增内容
    ----------------------
    import org.apache.spark.sql.SaveMode
    val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
    //覆盖模式,原来有文件存在会先删除,在写入
    peopleDF.select("name", "age").write.mode(SaveMode.Append).save("/tmp/test1.parquet")
    //读取刚刚写入程序 
    val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
    (3)Ignore
    如果有文件存在, 则不发生任何事情,和create table if not exists 一样的功能
    peopleDF.select("name", "age").write.mode(SaveMode.Ignore).save("/tmp/test1.parquet")
    //读取刚刚写入程序 
    val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
    (4)ErrorIfExists
    如果文件存在,就报错,默认就是这个模式
    如果有文件存在, 则不发生任何事情,和create table if not exists 一样的功能
    peopleDF.select("name", "age").write.mode(SaveMode.ErrorIfExists).save("/tmp/test1.parquet")
    //读取刚刚写入程序 
    val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
    保存数据到hive表
    可以通过saveAsTable方法将DataFrames存储到表中,现有的hive版本不支持该功能。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可以使用SQLContext的table方法。table先创建一个表,方法参数为要创建的表的表名,然后将DataFrame持久化到这个表中。
    默认的saveAsTable方法将创建一个“managed table”,表示数据的位置可以通过metastore获得。当存储数据的表被删除时,managed table也将自动删除。
    目前,saveAsTable不支持一外表的方式将dataframe内容保存到外表,需要打一个patch才能实现
    从spark2.1开始,持久化源数据到表中的元数据,在hive中也可以进行分区存储,这样
    · 查询时只需要返回需要的分区数据,不需要查询全部分区数据
    · DDL语句,如ALTER TABLE PARTITION ... SET LOCATION,这样的语句可以使用Datasource API来实现。
     
    3.2 Parquets文件格式
    3.2.1 读取Parquet文件(Loading Data Programmatically)
    Parquet是一种支持多种数据处理列存储数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。
    读取Parquet文件示例如下:
    ---------------
    import spark.implicits._
    val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
    peopleDF.write.parquet("/tmp/people.parquet")
    val parquetFileDF = spark.read.parquet("people.parquet")
    parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show()
    --------------------
    3.2.2 解析分区信息
    对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:

    通过传递path/to/table给SparkSession.read.parquet or SparkSession.read.load,Spark SQL将自动解析分区信息。返回的DataFrame的Schema如下:
    需要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。如果想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不再进行类型解析。注意要解析的路径写法问题,是写相对路径还是绝对路径,
    3.2.3 Schema合并
    像ProtocolBuffer、Avro和Thrift那样,Parquet也支持Schema evolution(Schema演变)。用户可以先定义一个简单的Schema,然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。现在Parquet数据源能自动检测这种情况,并合并这些文件的schemas。
    因为Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。可以通过下面两种方式开启该功能:
      ● 当数据源为Parquet文件时,将数据源选项mergeSchema设置为true
      ● 设置全局SQL选项spark.sql.parquet.mergeSchema为true
    ----------------
    import spark.implicits._
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("/tmp/test_table1/key=1")
    squaresDF .printSchema()
     
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("/tmp/test_table1/key=2")
    cubesDF.printSchema()
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("/tmp/test_table1")
    mergedDF.printSchema()
     
    3.2.4 Hive metastore Parquet表转换
    Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。所以,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例如下:
    --------------
     sqlContext.refreshTable("my_table")
    --------------
    配置Parquet可以使用SparkSession的setConf方法或使用SQL执行SET key=value命令。详细参数说明如下:
    3.3 JSON数据集
    Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。
    需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:
    --------------- val path = "file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) peopleDF.printSchema() peopleDF.createOrReplaceTempView("people") val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show()
    val otherPeopleRDD = spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD)
    otherPeople .printSchema() otherPeople.show()
     

     
    3.4 Hive表
     ------------------
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) val warehouseLocation = "/user/hive/warehouse" val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate() import spark.implicits._
    import spark.sql sql("CREATE TABLE IF NOT EXISTS src(key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src") sql("SELECT * FROM src").show()
    通过hive可以发下新增的src表
    sql("SELECT COUNT(*) FROM src").show() val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
    sqlDF .show() // The items in DaraFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map {
    case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    -------------
    备注:
    1)将hive-site.xml拷贝到spark conf目录下,所有节点都要有
    2)将mysql驱动包分发到spark所有节点的jar目录下
    3.4.1 访问不同版本的Hive Metastore
     Spark SQL经常需要访问Hive metastore,Spark SQL可以通过Hive metastore获取Hive表的元数据。从Spark 1.4.0开始,Spark SQL只需简单的配置,就支持各版本Hive metastore的访问。注意,涉及到metastore时Spar SQL忽略了Hive的版本。Spark SQL内部将Hive反编译至Hive 1.2.1版本,Spark SQL的内部操作(serdes, UDFs, UDAFs, etc)都调用Hive 1.2.1版本的class。版本配置项见下面表格:
    3.5 JDBC连接其他库
     
    bin/spark-shell --driver-class-path jars/ojdbc14.jar --jars jars/ojdbc14.jar --master spark://master01:7077
    ----------------------
    val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.1.121:3306/sjh") .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "dataArrive").option("user", "maptest") .option("password", "maptest789").load()
    jdbcDF.show()
    //创建临时视图
    jdbcDF.createOrReplaceTempView("dataArrvie")
    //执行sql
    val results = spark.sql("SELECT * FROM dataArrvie")
    results.map(attributes => "JobaName: " + attributes(0)).show()
    ----------------------写数据到mysql
    import java.util.Properties 
     
    val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.1.121:3306/sjh") .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "ApplyJob").option("user", "maptest") .option("password", "maptest789").load()
    jdbcDF.createOrReplaceTempView("ApplyJob")
    //执行sql
    val results = spark.sql("SELECT * FROM ApplyJob")
     
    results.write.mode("append").jdbc("jdbc:mysql://192.168.1.121:3306/sjh", "ApplyJob2", connectionProperties)
     故障排除(Troubleshooting)
      ● 在客户端session和所有的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。因为当创建一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略所有对启动类加载器为非visible的driver。一个很方便的解决方法是,修改所有worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。
      ● 有些数据库(例:H2)将所有的名字转换为大写,所以在这些数据库中,Spark SQL也需要将名字全部大写。


    4 性能调优
    4.1 缓存数据至内存
    Spark SQL可以通过调用spark.catalog.cacheTable("tableName") 或者dataFrame.cache(),将表用列式存储格式( an in&shy;memory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。可通过两种配置方式开启缓存数据功能:
      ● 使用SparkSession的setConf方法
      ● 执行SQL命令 SET key=value
    4.2 调优参数
    可以通过配置下表中的参数调节Spark SQL的性能。在后续的Spark版本中将逐渐增强自动调优功能,下表中的参数在后续的版本中或许将不再需要配置。


    5 分布式SQL引擎
    使用Spark SQL的JDBC/ODBC或者CLI,可以将Spark SQL作为一个分布式查询引擎。终端用户或应用不需要编写额外的代码,可以直接使用Spark SQL执行SQL查询。
    5.1 运行Thrift JDBC/ODBC服务
    这里运行的Thrift JDBC/ODBC服务与Hive 1.2.1中的HiveServer2一致。可以在Spark目录下执行如下命令来启动JDBC/ODBC服务,
    命令:./sbin/start-thriftserver.sh
    这个命令接收所有 bin/spark-submit 命令行参数,添加一个 --hiveconf 参数来指定Hive的属性。详细的参数说明请执行
    命令: ./sbin/start-thriftserver.sh --help 
    服务默认监听端口为localhost:10000。有两种方式修改默认监听端口:
      ● 修改环境变量:
    export HIVE_SERVER2_THRIFT_PORT=<listening-port>
    export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
     ./sbin/start-thriftserver.sh
     --master <master-uri> ...
      ● 修改系统属性
    ./sbin/start-thriftserver.sh
    --hiveconf hive.server2.thrift.port=<listening-port>
     --hiveconf hive.server2.thrift.bind.host=<listening-host>
    --master <master-uri> ...
     
    使用 beeline 来测试Thrift JDBC/ODBC服务:
    ./bin/beeline
    连接到Thrift JDBC/ODBC服务
    beeline> !connect jdbc:hive2://localhost:10000
    在非安全模式下,只需要输入机器上的一个用户名即可,无需密码。在安全模式下,beeline会要求输入用户名和密码。安全模式下的详细要求,请阅读beeline documentation的说明。
    配置Hive需要替换 conf/ 目录下的 hive-site.xml。
    Thrift JDBC服务也支持通过HTTP传输发送thrift RPC messages。开启HTTP模式需要将下面的配参数配置到系统属性或 conf/: 下的 hive-site.xml中
    hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
     hive.server2.http.endpoint - HTTP endpoint; default is cliservice
    测试http模式,可以使用beeline链接JDBC/ODBC服务:
    beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
    5.2 运行Spark SQL CLI
    Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。需要注意的是,Spark SQL CLI不能与Thrift JDBC服务交互。
    在Spark目录下执行如下命令启动Spark SQL CLI:
    ./bin/spark-sql
    配置Hive需要替换 conf/ 下的 hive-site.xml 。
    执行 ./bin/spark-sql --help 可查看详细的参数说明 。
     
    6.参考资料
    1)http://spark.apache.org/docs/latest/sql-programming-guide.html
      Spark SQL官方网站
    2) http://www.cnblogs.com/BYRans/p/5057110.html
    3).http://www.cnblogs.com/BYRans/p/5057110.html
    4).http://blog.csdn.net/yhao2014/article/details/52215966
    5).http://www.tuicool.com/articles/yEZr6ve case class与普通class区别

  • 相关阅读:
    Docker+geoserver发布shp地图服务
    Docker中运行命令时提示:Cannot connect to the Docker daemony...以及设置docker开机启动
    Docker在服务器之间怎样导入导出镜像(服务器之间容器复制)
    Docker+Tomcat+geoserver+shp发布地图服务
    Windows中将文件压缩成linux支持的tar.gz格式的压缩包
    Docker怎样提交镜像(把容器打成镜像)
    Docker中宿主机与容器之间互传文件(docker cp的方式)
    移动零
    旋转数组
    有序数组的平方
  • 原文地址:https://www.cnblogs.com/licheng/p/6822453.html
Copyright © 2011-2022 走看看