zoukankan      html  css  js  c++  java
  • 理解Spark SQL(三)—— Spark SQL程序举例

    上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createOrReplaceTempView注册一个临时表,所以关键是先要将RDD转换成DataFrame。实际上,在Spark中实际声明了

    type DataFrame = Dataset[Row]

    所以,DataFrame是Dataset[Row]的别名。RDD是提供面向低层次的API,而DataFrame/Dataset提供面向高层次的API(适合于SQL等面向结构化数据的场合)。

    下面提供一些Spark SQL程序的例子。

    例子一:SparkSQLExam.scala

     1 package bruce.bigdata.spark.example
     2 
     3 import org.apache.spark.sql.Row
     4 import org.apache.spark.sql.SparkSession
     5 import org.apache.spark.sql.types._
     6 
     7 object SparkSQLExam {
     8 
     9     case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
    10     
    11     def main(args: Array[String]) {
    12 
    13         val spark = SparkSession
    14           .builder
    15           .appName("SparkSQLExam")
    16           .getOrCreate()
    17         
    18         runSparkSQLExam1(spark)
    19         runSparkSQLExam2(spark)
    20         
    21         spark.stop()
    22     
    23     }
    24     
    25     
    26     private def runSparkSQLExam1(spark: SparkSession): Unit = {
    27     
    28         import spark.implicits._
    29         
    30         val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("	")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
    31         val officesDataFrame = spark.createDataFrame(rddOffices)
    32         
    33         officesDataFrame.createOrReplaceTempView("offices")
    34         spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
    35         
    36     
    37     }
    38     
    39     private def runSparkSQLExam2(spark: SparkSession): Unit = {
    40     
    41          import spark.implicits._
    42          import org.apache.spark.sql._
    43          import org.apache.spark.sql.types._
    44         
    45          val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
    46          val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("	")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
    47          val dataFrame = spark.createDataFrame(rowRDD, schema)
    48          
    49          dataFrame.createOrReplaceTempView("offices2")        
    50          spark.sql("select city from offices2 where region='Western'").map(t=>"City: " + t(0)).collect.foreach(println)
    51         
    52     }
    53     
    54 }

    使用下面的命令进行编译:

    [root@BruceCentOS4 scala]# scalac SparkSQLExam.scala

    在编译之前,需要在CLASSPATH中增加路径:

    export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)

    然后打包成jar文件:

    [root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

    然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。

    [root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkSQLExam --master yarn --deploy-mode client spark_exam_scala.jar

    运行结果截图:

    例子二:SparkSQLExam.scala(需要启动hive metastore)

     1 package  bruce.bigdata.spark.example
     2 
     3 import org.apache.spark.sql.{SaveMode, SparkSession}
     4 
     5 object SparkHiveExam {
     6 
     7     def main(args: Array[String]) {
     8         
     9         val spark = SparkSession
    10           .builder()
    11           .appName("Spark Hive Exam")
    12           .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
    13           .enableHiveSupport()
    14           .getOrCreate()
    15        
    16         import spark.implicits._
    17         
    18         //使用hql查看hive数据
    19         spark.sql("show databases").collect.foreach(println)
    20         spark.sql("use orderdb")
    21         spark.sql("show tables").collect.foreach(println)
    22         spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
    23         
    24         //将hql查询出的数据保存到另外一张新建的hive表
    25         //找出订单金额超过1万美元的产品
    26         spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 
    27                    ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' LINES TERMINATED BY '
    ' STORED AS TEXTFILE""")
    28         spark.sql("""select mfr_id,product_id,description
    29                    from products a inner join orders b
    30                    on a.mfr_id=b.mfr and a.product_id=b.product
    31                    where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales")
    32         
    33         //将HDFS文件数据导入到hive表中            
    34         spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double ) 
    35                    ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' LINES TERMINATED BY '
    ' STORED AS TEXTFILE""")
    36         spark.sql("LOAD DATA INPATH '/user/hive/warehouse/orderdb.db/offices/offices.txt' INTO TABLE offices2")
    37         
    38         spark.stop()
    39     }
    40 }

    使用下面的命令进行编译:

    [root@BruceCentOS4 scala]# scalac SparkHiveExam.scala

    使用下面的命令打包:

    [root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

    使用下面的命令运行:

    [root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkHiveExam --master yarn --deploy-mode client spark_exam_scala.jar

    程序运行结果:

     

    另外上述程序运行后,hive中多了2张表:

     

    例子三:spark_sql_exam.py

     1 from __future__ import print_function
     2 
     3 from pyspark.sql import SparkSession
     4 from pyspark.sql.types import *
     5 
     6 
     7 if __name__ == "__main__":
     8     spark = SparkSession 
     9         .builder 
    10         .appName("Python Spark SQL exam") 
    11         .config("spark.some.config.option", "some-value") 
    12         .getOrCreate()
    13 
    14     schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False), 
    15         StructField("region", StringType(), False), StructField("mgr", IntegerType(), True), 
    16         StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)])
    17         
    18     rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("	")) 
    19         .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip())))
    20             
    21     dataFrame = spark.createDataFrame(rowRDD, schema)
    22     dataFrame.createOrReplaceTempView("offices")
    23     spark.sql("select city from offices where region='Eastern'").show()
    24     
    25     spark.stop()

     执行命令运行程序:

    [root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

    程序运行结果:

    例子四:JavaSparkSQLExam.java

     1 package bruce.bigdata.spark.example;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 
     6 import org.apache.spark.api.java.JavaRDD;
     7 import org.apache.spark.api.java.function.Function;
     8 import org.apache.spark.api.java.function.MapFunction;
     9 import org.apache.spark.sql.Dataset;
    10 import org.apache.spark.sql.Row;
    11 import org.apache.spark.sql.RowFactory;
    12 import org.apache.spark.sql.SparkSession;
    13 import org.apache.spark.sql.types.DataTypes;
    14 import org.apache.spark.sql.types.StructField;
    15 import org.apache.spark.sql.types.StructType;
    16 import org.apache.spark.sql.AnalysisException;
    17 
    18 
    19 public class JavaSparkSQLExam {
    20     public static void main(String[] args) throws AnalysisException {
    21         SparkSession spark = SparkSession
    22           .builder()
    23           .appName("Java Spark SQL exam")
    24           .config("spark.some.config.option", "some-value")
    25           .getOrCreate();    
    26         
    27         List<StructField> fields = new ArrayList<>();
    28         fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false));
    29         fields.add(DataTypes.createStructField("city", DataTypes.StringType, false));
    30         fields.add(DataTypes.createStructField("region", DataTypes.StringType, false));
    31         fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true));
    32         fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true));
    33         fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false));
    34         
    35         StructType schema = DataTypes.createStructType(fields);
    36         
    37         
    38         JavaRDD<String> officesRDD = spark.sparkContext()
    39           .textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)
    40           .toJavaRDD();
    41         
    42         JavaRDD<Row> rowRDD = officesRDD.map((Function<String, Row>) record -> {
    43           String[] attributes = record.split("	");
    44           return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim()));
    45         });
    46         
    47         Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, schema);
    48         
    49         dataFrame.createOrReplaceTempView("offices");
    50         Dataset<Row> results = spark.sql("select city from offices where region='Eastern'");
    51         results.collectAsList().forEach(r -> System.out.println(r));
    52         
    53         spark.stop();
    54     }
    55 }

    编译打包后通过如下命令执行:

    [root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.JavaSparkSQLExam --master yarn --deploy-mode client spark_exam_java.jar

    运行结果:

    上面是一些关于Spark SQL程序的一些例子,分别采用了Scala/Python/Java来编写的。另外除了这三种语言,Spark还支持R语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实API都是基本一致的,主要是采用DataFrame和Dataset的高层次API来调用和执行SQL。使用这些API,可以轻松的将结构化数据转化成SQL来操作,同时也能够方便的操作Hive中的数据。

  • 相关阅读:
    动手动脑
    加减乘除
    测试
    Java学习的第五十六天
    Java学习的第五十五天
    js判断一个时间是否在某一个时间段内
    js计算两个时间相差多少分钟
    js分钟转化为小时并且以某个数字进行递增
    关于Js debounce(防抖)函数和throttle(节流)小结
    webpack配置scss
  • 原文地址:https://www.cnblogs.com/roushi17/p/spark_sql_examples.html
Copyright © 2011-2022 走看看