zoukankan      html  css  js  c++  java
  • Spark SQL结构化数据处理

    Spark SQL是Spark框架的重要组成部分, 主要用于结构化数据处理和对Spark数据执行类SQL的查询。

    DataFrame是一个分布式的,按照命名列的形式组织的数据集合。 一张SQL数据表可以映射为一个DataFrame对象,DataFrame是Spark SQL中的主要数据结构。

    SqlContext实例是DataFrame和Spark SQL的操作入口, pyspark交互环境中已初始化了一个sqlContext实例, 在提交任务脚本时需要使用一个SparkContext来初始化:

    from pyspark.sql import SQLContext
    sqlContext = SqlContext(sparkContext)
    

    本文测试环境为Spark 2.1.0, Python API.

    创建DataFrame

    SqlContext.createDataFrame方法可以从python的list中创建DataFrame:

    >>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
    >>> df = sqlContext.createDataFrame(data)
    >>> df.collect()
    [Row(_1=u'a', _2=1, _3=18), 
    Row(_1=u'b', _2=2, _3=22),
    Row(_1=u'c', _2=3, _3=20)]
    

    list中的每一项成为DataFrame中的一行, 每一列的名字默认为_1, _2, _3.

    同样可以使用RDD来创建:

    >>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
    >>> rdd = sc.parallelize(data)
    >>> df = sqlContext.createDataFrame(rdd)
    >>> df.collect()
    [Row(_1=u'a', _2=1, _3=18), 
    Row(_1=u'b', _2=2, _3=22),
    Row(_1=u'c', _2=3, _3=20)]
    

    或者采用更简单的方法:

    >>> df = rdd.toDF()
    >>> >>> df.collect()
    [Row(_1=u'a', _2=1, _3=18), 
    Row(_1=u'b', _2=2, _3=22),
    Row(_1=u'c', _2=3, _3=20)]
    

    createFrame的第二个参数为可选参数schema用于定义每一列的名称和类型:

    >>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
    >>> df = sqlContext.createDataFrame(data, ['name', 'id', 'age'])
    >>> df.collect()
    [Row(name=u'a', id=1, age=18), 
    Row(name=u'b', id=2, age=22),
    Row(name=u'c', id=3, age=20)]
    

    同样可以使用元素为dict的列表创建DataFrame实例:

    >>> data = [
    ... {'name':'a', 'id':1, 'age': 18}, 
    ... {'name':'b', 'id':2, 'age': 22},
    ... {'name':'c', 'id':3, 'age': 20}]
    >>> df = sqlContext.createDataFrame(data)
    >>> df.collect()
    [Row(name=u'a', id=1, age=18), 
    Row(name=u'b', id=2, age=22),
    Row(name=u'c', id=3, age=20)]
    

    不过Spark官方推荐使用Row对象来代替dict:

    >>> from pyspark.sql import Row
    >>> User = Row('name', 'id', 'age')
    >>> row1 = User('a', 1, 18)
    >>> row2 = User('b', 2, 22)
    >>> row3 = User('b', 3, 20)
    >>> data = [row1, row2, row3]
    >>> df = sqlContext.createDataFrame(data)
    >>> df.collect()
    [Row(name=u'a', id=1, age=18), 
    Row(name=u'b', id=2, age=22),
    Row(name=u'c', id=3, age=20)]
    

    schema参数也可以使用pyspark中定义的字段类型:

    >>> from pyspark.sql.types import StructType, StructField
    >>> from pyspark.sql.types import StringType, IntegerType
    >>> schema = StructType([
    ... StructField("name", StringType(), True),  # name, type, nullable
    ... StructField("id", IntegerType(), True),
    ... StructField("age", IntegerType(), True)])
    >>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
    >>> df = sqlContext.createDataFrame(data, schema)
    >>> df.collect()
    [Row(name=u'a', id=1, age=18), 
    Row(name=u'b', id=2, age=22), 
    Row(name=u'c', id=3, age=20)]
    

    更多关于createDataFrame方法的信息可以参考官方文档

    SqlContext.read是一个pyspark.sql.DataFrameReader对象, 它可以用于根据外部数据源创建DataFrame, 包括读取文件和使用jdbc读取数据库。

    详情可以参考官方文档

    DataFrame操作

    DataFrame提供了一些常用操作的实现, 可以使用这些接口查看或修改DataFrame:

    • df.collect(): 以Row列表的方式显示df中的所有数据
    • df.show(): 以可视化表格的方式打印df中的所有数据
    • df.count(): 显示df中数据的行数
    • df.describe() 返回一个新的DataFrame对象包含对df中数值列的统计数据
    • df.cache(): 以MEMORY_ONLY_SER方式进行持久化
    • df.persist(level): 以指定的方式进行持久化
    • df.unpersist(): 删除缓存

    DataFrame的一些属性可以用于查看它的结构信息:

    • df.columns: 返回各列名称的列表

    • df.schema: 以StructType对象的形式返回df的表结构

    • df.dtypes: 以列表的形式返回每列的名称和类型。
      [('name', 'string'), ('id', 'int')]

    • df.rdd 将DataFrame对象转换为rdd

    DataFrame支持使用Map和Reduce操作:

    • df.map(func): 等同于df.rdd.map(func)

    • df.reduce(func): 等同于 df.rdd.reduce(func)

    DataFrame的结构可以进行一些修改:

    • df.drop(col): 返回一个删除指定列后的DataFrame对象:
    >>> df.drop('age')
    DataFrame[age:int, id: int]
    
    >>>df.drop(df.name)
    DataFrame[age:int, id: int]
    

    同样可以查询DataFrame中特定的记录:

    • df.take(index): 以列表的形式返回df的前n条记录, 下标从1开始

    • df.first(): 返回df中的第一个Row对象

    • df.filter(cond): 返回只包含满足条件记录的新DataFrame对象

    >>> df.filter(df.age>=20).collect()
    [Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
    
    • df.select(col): 返回只包含指定列的新DataFrame对象:
    >>> df.select('*').collect()
    [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
    
    >>> df.select(df.id, df.age-1).collect()
    [Row(id=1, (age - 1)=17), Row(id=2, (age - 1)=21), Row(id=3, (age - 1)=19)]
    
    • df.join(other, on=None, how=None)将df和other两个DataFrame对象连接为一个DataFrame对象.
      • on: 指定连接的列
      • how: 指定连接方式:'inner', 'outer', 'left_outer', 'right_outer', 'leftsemi', 默认为'inner'
    >>> df.collect()
    [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
    >>> df2.collect()
    [Row(id=1, nation=u'cn'), Row(id=2, nation=u'us'), Row(id=4, nation=u'uk')]
    
    >>> df.join(df2, 'id').collect()
    [Row(id=1, name=u'a', age=18, nation=u'cn'), Row(id=2, name=u'b', age=22, nation=u'us')]
    
    • df.limit(num): 返回一个新的DataFrame对象, 其记录数不超过num, 多余的记录将被删除.

    • df.distinct() : 返回一个新的去除重复行后的DataFrame对象

    更多信息可以参考官方文档

  • 相关阅读:
    关于GitHub推送时发生Permission denied (publickey)的问题
    线性模型——机器学习(西瓜书)读书笔记
    梯度下降算法的简单理解
    PRML学习笔记第一章
    python函数学习之装饰器
    机器学习 概论
    Mybatis
    Nginx 常用配置清单
    接口,抽象类
    IntelliJ IDEA打war包
  • 原文地址:https://www.cnblogs.com/Finley/p/6390528.html
Copyright © 2011-2022 走看看