zoukankan      html  css  js  c++  java
  • Spark--SQL

    SQL

    程序中SQL执行的结果返回的是DataFrame,

    DataFrames

    DataFrames是分布式数据集,由带名字的列组成。类似关系型数据库的结构。

    DataFrames的数据来源包括:结构化数据文件,Hive表,RDDs,外部数据库;json是半结构化文件.

    DataFrames的操作

    import org.apache.spark.sql.{Column, DataFrame, SQLContext}
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by Edward on 2016/9/6.
      */
    object DFTest {
      def main(args: Array[String]) {
    
        val conf: SparkConf = new SparkConf().setAppName("DF").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
    
        val sqlContext: SQLContext = new SQLContext(sc)
    
        val df: DataFrame = sqlContext.read.json("D:\documents\Spark\MyDemo\Test\res\people.json")
        
    // Show the content of the DataFrame df.show()
    // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1 sc.stop() } }

    读取的Json的文件内容:

    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}

    更多操作参考:http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.DataFrame

    两种方式将RDDs转换成DataFrames

    如果是json和parquet直接可以转换成DF,如果是普通的数据文件需要将读取的文件数据结构RDDs转换成DataFrames。

    1.反射  (简洁,需要指定表结构类型)

    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by Edward on 2016/9/6.
      */
    object Reflection {
    
      // Define the schema using a case class.
      // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
      // you can use custom classes that implement the Product interface.
      case class Person(name: String, age: Int)
    
      def main(args: Array[String]) {
    
        val conf: SparkConf = new SparkConf().setAppName("Reflection").setMaster("local")
    
        val sc: SparkContext = new SparkContext(conf)
    
        // sc is an existing SparkContext.
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // this is used to implicitly convert an RDD to a DataFrame.
        import sqlContext.implicits._
    
        // Create an RDD of Person objects and register it as a table.
        val people = sc.textFile("res/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
        people.registerTempTable("people")
    
        // SQL statements can be run by using the sql methods provided by sqlContext.
        val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
    
        // The results of SQL queries are DataFrames and support all the normal RDD operations.
        // The columns of a row in the result can be accessed by field index:
        teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    
        // or by field name:
        teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
    
        // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
        teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
        // Map("name" -> "Justin", "age" -> 19)
    
        sc.stop()
      }
    }

    2.动态

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by Edward on 2016/9/6.
      */
    object programmatic {
    
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setAppName("Programmatic").setMaster("local")
    
        val sc = new SparkContext(conf)
    
        val sqlContext = new SQLContext(sc)
    
        // Create an RDD
        val people = sc.textFile("res/people.txt")
    
        // The schema is encoded in a string
        val schemaString = "name age"
    
        // Import Row.
        import org.apache.spark.sql.Row;
    
        // Import Spark SQL data types
        import org.apache.spark.sql.types.{StructType, StructField, StringType};
    
        // Generate the schema based on the string of schema
        val schema =
          StructType(
            schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    
        // Convert records of the RDD (people) to Rows.
        val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    
        // Apply the schema to the RDD.
        val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    
        // Register the DataFrames as a table.
        peopleDataFrame.registerTempTable("people")
    
        // SQL statements can be run by using the sql methods provided by sqlContext.
        val results = sqlContext.sql("SELECT name FROM people")
    
        // The results of SQL queries are DataFrames and support all the normal RDD operations.
        // The columns of a row in the result can be accessed by field index or by field name.
        results.map(t => "Name: " + t(0)).collect().foreach(println)
    
        sc.stop()
    
      }
    }

    数据源

    默认的数据源是parquet

    import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
    import org.apache.spark.{SparkContext, SparkConf}
    
    
    /**
      * Created by Edward on 2016/9/7.
      */
    object Parquet {
      def main(args: Array[String]) {
    
        val sparkConf: SparkConf = new SparkConf().setAppName("Parquet").setMaster("local")
        val sc = new SparkContext(sparkConf)
    
        val sqlContext  = new SQLContext(sc)
    
        //val parquet = sqlContext.read.parquet("res/users.parquet")
        //val parquet = sqlContext.read.load("res/users.parquet")  //默认数据源是parquet,可以配置spark.sql.sources.default修改
        //val parquet  = sqlContext.read.parquet("new.parquet/*.parquet")  //可以使用模糊匹配
        //val json = sqlContext.read.format("json").load("res/people.json") //读取json文件,通过format指定文件格式
        val json = sqlContext.read.json("res/people.json") //通过json方法直接读取json文件
    
        //json.select("name","age").write.mode(SaveMode.Overwrite).save("new.parquet") //默认保存为parquet文件
        json.select("name","age").write.mode(SaveMode.Overwrite).format("json").save("jsonfile") //保存为json文件 jsonfile为指定目录
        json.show()
        //parquet.show()
    
        sc.stop()
      }
    }
  • 相关阅读:
    Oracle查询今天的数据(昨天、本周...)
    Windows添加删除 route
    大三寒假学习进度(九)
    大三寒假学习进度(八)
    大三寒假学习进度(七)
    大三寒假学习进度(六)
    大三寒假学习进度(五)
    《软件架构师应该知道的97件事》阅读笔记(一)
    大三寒假学习进度(四)
    大三寒假学习进度(三)
  • 原文地址:https://www.cnblogs.com/one--way/p/5845971.html
Copyright © 2011-2022 走看看