zoukankan      html  css  js  c++  java
  • Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

    Spark SQL是处理结构化数据的Spark模块。它提供了DataFrames这样的编程抽象。同一时候也能够作为分布式SQL查询引擎使用。

    DataFrames

    DataFrame是一个带有列名的分布式数据集合。等同于一张关系型数据库中的表或者R/Python中的data frame,只是在底层做了非常多优化;我们能够使用结构化数据文件、Hive tables,外部数据库或者RDDS来构造DataFrames。

    1. 開始入口:

    入口须要从SQLContext类或者它的子类開始,当然须要使用SparkContext创建SQLContext;这里我们使用pyspark(已经自带了SQLContext即sc):

    from pyspark.sql import SQLContext
    sqlContext = SQLContext(sc)

    还能够使用HiveContext,它能够提供比SQLContext很多其它的功能。比如能够使用更完整的HiveQL解析器写查询,使用Hive UDFs。从Hive表中读取数据等。

    使用HiveContext并不须要安装hive,Spark默认将HiveContext单独打包避免对hive过多的依赖

    2.创建DataFrames
    使用JSON文件创建:

    from pyspark.sql import SQLContext
    sqlContext = SQLContext(sc)
    
    df = sqlContext.read.json("examples/src/main/resources/people.json")
    
    # Displays the content of the DataFrame to stdout
    df.show()

    注意:
    这里你可能须要将文件存入HDFS(这里的文件在Spark安装文件夹中,1.4版本号)

    hadoop fs -mkdir examples/src/main/resources/
    hadoop fs -put /appcom/spark/examples/src/main/resources/*         /user/hdpuser/examples/src/main/resources/

    3.DataFrame操作

    from pyspark.sql import SQLContext
    sqlContext = SQLContext(sc)
    
    # Create the DataFrame
    df = sqlContext.read.json("examples/src/main/resources/people.json")
    
    # Show the content of the DataFrame
    df.show()
    ## age  name
    ## null Michael
    ## 30   Andy
    ## 19   Justin
    
    # Print the schema in a tree format
    df.printSchema()
    ## root
    ## |-- age: long (nullable = true)
    ## |-- name: string (nullable = true)
    
    # Select only the "name" column
    df.select("name").show()
    ## name
    ## Michael
    ## Andy
    ## Justin
    
    # Select everybody, but increment the age by 1
    df.select(df['name'], df['age'] + 1).show()
    ## name    (age + 1)
    ## Michael null
    ## Andy    31
    ## Justin  20
    
    # Select people older than 21
    df.filter(df['age'] > 21).show()
    ## age name
    ## 30  Andy
    
    # Count people by age
    df.groupBy("age").count().show()
    ## age  count
    ## null 1
    ## 19   1
    ## 30   1

    4.使用编程执行SQL查询
    SQLContext能够使用编程执行SQL查询并返回DataFrame。

    from pyspark.sql import SQLContext
    sqlContext = SQLContext(sc)
    df = sqlContext.sql("SELECT * FROM table")

    5.和RDD交互

    将RDD转换成DataFrames有两种方法:

    • 利用反射来判断包括特定类型对象的RDD的schema。这样的方法会简化代码而且在你已经知道schema的时候非常适用。
    • 使用编程接口。构造一个schema并将其应用在已知的RDD上。

    一、利用反射判断Schema
    Spark SQL能够将含Row对象的RDD转换成DataFrame。并判断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。

    key定义了表的列名,类型通过看第一列数据来判断。

    (所以这里RDD的第一列数据不能有缺失)未来版本号中将会通过看很多其它数据来判断数据类型。像如今对JSON文件的处理一样。

    # sc is an existing SparkContext.
    from pyspark.sql import SQLContext, Row
    sqlContext = SQLContext(sc)
    
    # Load a text file and convert each line to a Row.
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
    
    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = sqlContext.createDataFrame(people)
    schemaPeople.registerTempTable("people")
    
    # SQL can be run over DataFrames that have been registered as a table.
    teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    
    # The results of SQL queries are RDDs and support all the normal RDD operations.
    teenNames = teenagers.map(lambda p: "Name: " + p.name)
    for teenName in teenNames.collect():
      print teenName

    二、编程指定Schema
    通过编程指定Schema须要3步:

    1. 从原来的RDD创建一个元祖或列表的RDD。
    2. 用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。

    3. 通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。

    # Import SQLContext and data types
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    
    # sc is an existing SparkContext.
    sqlContext = SQLContext(sc)
    
    # Load a text file and convert each line to a tuple.
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: (p[0], p[1].strip()))
    
    # The schema is encoded in a string.
    schemaString = "name age"
    
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)
    
    # Apply the schema to the RDD.
    schemaPeople = sqlContext.createDataFrame(people, schema)
    
    # Register the DataFrame as a table.
    schemaPeople.registerTempTable("people")
    
    # SQL can be run over DataFrames that have been registered as a table.
    results = sqlContext.sql("SELECT name FROM people")
    
    # The results of SQL queries are RDDs and support all the normal RDD operations.
    names = results.map(lambda p: "Name: " + p.name)
    for name in names.collect():
      print name
  • 相关阅读:
    LIst判断是否为空
    SYBASE数据库基本操作
    Java内部类详解--成员内部类,局部内部类,匿名内部类,静态内部类
    String转jsonarry:字符串:[{"result":"20"},{"result":"21"},{"result":"20"},{"result":"22"}]
    使用HttpURLConnection发送POST请求
    Java 定义字符串数组
    JSP 解决illegal to have multiple occurrences of contentType with different values错误
    javascript 获取url参数值
    外键为','(逗号)拼接ID,连接查询外键表ID
    excel、csv、txt文件数据读取
  • 原文地址:https://www.cnblogs.com/lytwajue/p/7290254.html
Copyright © 2011-2022 走看看