zoukankan      html  css  js  c++  java
  • Spark 学习笔记:(三)Spark SQL

    参考:https://spark.apache.org/docs/latest/sql-programming-guide.html#overview

       http://www.csdn.net/article/2015-04-03/2824407

    Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.

    1)在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

    2)A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data.

    3)The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.

    val df = sqlContext.sql("SELECT * FROM table")   //sql接口

    创建DataFrames:

    With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources. 

    val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    • Two different methods for converting existing RDDs into DataFrames.

      • The first method uses reflection to infer the schema of an RDD that contains specific types of objects. 

    The case class defines the schema of the table=>The names of the arguments to the case class are read using reflection and become the names of the columns=>This RDD can be implicitly converted to a DataFrame and then be registered as a table=>Tables can be used in subsequent SQL statements.

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    
    // Define the schema using a case class.
    // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
    // you can use custom classes that implement the Product interface.
    case class Person(name: String, age: Int)
    
    // Create an RDD of Person objects and register it as a table.
    val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
      •  Aprogrammatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.
    • From data sources: 
    val df = sqlContext.load("people.json", "json")
    df.select("name", "age").save("namesAndAges.parquet", "parquet")

    or   

    val df = sqlContext.jsonFile("examples/src/main/resources/people.json")   
  • 相关阅读:
    学习笔记-Python基础4-九九乘法表练习
    学习笔记-Python基础4-函数
    学习笔记-Python基础3-程序结构:顺序、分支、循环
    JS根据获取的navigator.userAgent,判断用户打开页面的终端
    代理模式,CGLIB 与Spring AOP实现原理
    ueditor不过滤保存html
    ecstore 新增input控件方法
    ecstore前台模板保留css样式
    ecstore 当前网址
    mysql 导入数据过大报错
  • 原文地址:https://www.cnblogs.com/aezero/p/spark.html
Copyright © 2011-2022 走看看