zoukankan      html  css  js  c++  java
  • Databricks 第2篇:pyspark.sql 简介

    pyspark中的DataFrame等价于Spark SQL中的一个关系表。在pyspark中,DataFrame由Column和Row构成。

    • pyspark.sql.SparkSession:是DataFrame和SQL函数的主要入口
    • DataFrameReader:读取数据,返回DataFrame
    • DataFrameWriter:把DataFrame存储到其他存储系统
    • pyspark.sql.DataFrame、pyspark.sql.Column和 pyspark.sql.Row

    一,SparkSession类

    在操作DataFrame之前,首先需要创建SparkSession,通过SparkSession来操作DataFrame。

    1,创建SparkSession

    通过Builder类来创建SparkSession,在Databricks Notebook中,spark是默认创建,表示一个SparkSession对象:

    spark = SparkSession.builder 
        .master("local") 
        .appName("Word Count") 
        .config("spark.some.config.option", "some-value") 
        .getOrCreate()

    函数注释:

    • master(master):用于设置要连接的Spark的master URL,例如local表示在本地运行,local[4] 在本地使用4核运行,
    • appName(name):为application设置一个名字
    • config(key=Nonevalue=Noneconf=None):设置SparkSession的配置选项,
    • getOrCreate():获得一个已存在的或者创建一个新的SparkSession

    2,从常量数据中创建DataFrame

    从RDD、list或pandas.DataFrame 创建DataFrame:

    createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

    3,从SQL查询中创建DataFrame

    从一个给定的SQL查询或Table中获取DataFrame,举个例子:

    df.createOrReplaceTempView("table1")
    
    #use SQL query to fetch data
    df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
    
    #use table to fetch data
    df2 = spark.table("table1")

    4,SparkSession的两个重要属性

    read:该属性是DataFrameReader 对象,用于读取数据,返回DataFrame对象

    readStream:该属性是DataStreamReader对象,用于读取Data Stream,返回 流式的DataFrame对象( streaming DataFrame)

    二,DataFrameReader类

    从外部存储系统中读取数据,返回DataFrame对象,通常使用SparkSession.read来访问,通用语法是先调用format()函数来指定输入数据的格式,后调用load()函数从数据源加载数据,并返回DataFrame对象:

    df = spark.read.format('json').load('python/test_support/sql/people.json')

    对于不同的格式,DataFrameReader类有细分的函数来加载数据:

    df_csv = spark.read.csv('python/test_support/sql/ages.csv')
    df_json = spark.read.json('python/test_support/sql/people.json')
    df_txt = spark.read.text('python/test_support/sql/text-test.txt')
    df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned')
    
    # read a table as a DataFrame
    df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
    df.createOrReplaceTempView('tmpTable')
    spark.read.table('tmpTable')

    还可以通过jdbc,从JDBC URL中构建DataFrame

    jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)

    三,DataFrameWriter类

    用于把DataFrame写入到外部存储系统中,通过DataFrame.write来访问。

    (df.write.format('parquet')  
        .mode("overwrite")
        .saveAsTable('bucketed_table'))

    函数注释:

    • format(source):指定底层输出的源的格式
    • mode(saveMode):当数据或表已经存在时,指定数据存储的行为,保存的模式有:append、overwrite、error和ignore。
    • saveAsTable(nameformat=Nonemode=NonepartitionBy=None**options):把DataFrame 存储为表
    • save(path=Noneformat=Nonemode=NonepartitionBy=None**options):把DataFrame存储到数据源中

    对于不同的格式,DataFrameWriter类有细分的函数来加载数据:

    df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
    df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
    df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
    df.write.txt(os.path.join(tempfile.mkdtemp(), 'data'))
    
    #wirte data to external database via jdbc
    df.write.jdbc(url, table, mode=None, properties=None)

    把DataFrame内容存储到源中:

    df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data'))

    把DataFrame的内容存到表中:

    df.write.saveAsTable(name='db_name.table_name',format='delta')

    四,DataFrame操作

    DataFrame等价于Spark SQL中的关系表,

    1,常规操作

    从parquet 文件中读取数据,返回一个DataFrame对象:

    people = spark.read.parquet("...")

    从DataFrame对象返回一列:

    ageCol = people.age

    从DataFrame对象中row的集合:

    people.collect()

    从DataFrame对象中删除列:

    people.drop(*cols)

    2,创建临时视图

    可以创建全局临时视图,也可以创建本地临时视图,对于local view,临时视图的生命周期和SparkSession相同;对于global view,临时视图的生命周期由Spark application决定。

    createOrReplaceGlobalTempView(name)
    createGlobalTempView(name)
    createOrReplaceTempView(name)
    createTempView(name)

    3,DataFrame数据的查询

    df.filter(df.age > 3)
    df.select('name', 'age')
    
    # join
    cond = [df.name == df3.name, df.age == df3.age]
    df.join(df3, cond, 'outer').select(df.name, df3.age)
    
    #group by 
    df.groupBy('name').agg({'age': 'mean'})

     

    参考文档:

    pyspark.sql module

  • 相关阅读:
    Buildroot构建指南--Overview
    监控摄像机常识:宽动态 (WDR)介绍和理解
    HM visual studio编译报错
    宽带有哪几种接入方式
    V.24 V.35 ISDN E1 POS这些常见的广域网接口
    Linux ppp 数据收发流程
    ppp协议解析二
    PPP协议解析一
    TAP/TUN(二)
    TAP/TUN浅析
  • 原文地址:https://www.cnblogs.com/ljhdo/p/14177036.html
Copyright © 2011-2022 走看看