zoukankan      html  css  js  c++  java
  • 《Spark Python API 官方文档中文版》 之 pyspark.sql (二)

    摘要:在Spark开发中,由于需要用Python实现,发现API与Scala的略有不同,而Python API的中文资料相对很少。每次去查英文版API的说明相对比较慢,还是中文版比较容易get到所需,所以利用闲暇之余将官方文档翻译为中文版,并亲测Demo的代码。在此记录一下,希望对那些对Spark感兴趣和从事大数据开发的人员提供有价值的中文资料,对PySpark开发人员的工作和学习有所帮助。

    官网地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html            

    pyspark.sql module

    Module Context

    Spark SQL和DataFrames重要的类有:
    pyspark.sql.SQLContext DataFrame和SQL方法的主入口
    pyspark.sql.DataFrame 将分布式数据集分组到指定列名的数据框中
    pyspark.sql.Column DataFrame中的列
    pyspark.sql.Row DataFrame数据的行
    pyspark.sql.HiveContext 访问Hive数据的主入口
    pyspark.sql.GroupedData 由DataFrame.groupBy()创建的聚合方法集
    pyspark.sql.DataFrameNaFunctions 处理丢失数据(空数据)的方法
    pyspark.sql.DataFrameStatFunctions 统计功能的方法
    pyspark.sql.functions DataFrame可用的内置函数
    pyspark.sql.types 可用的数据类型列表
    pyspark.sql.Window 用于处理窗口函数

    3.class pyspark.sql.DataFrame(jdf, sql_ctx)

    分布式的收集数据分组到命名列中。
    一个DataFrame相当于在Spark SQL中一个相关的表,可在SQLContext使用各种方法创建,如:

    people = sqlContext.read.parquet("...")

    一旦创建, 可以使用在DataFrame、Column中定义的不同的DSL方法操作。
    从data frame中返回一列使用对应的方法:

    ageCol = people.age

    一个更具体的例子:

    # To create DataFrame using SQLContext
    people = sqlContext.read.parquet("...")
    department = sqlContext.read.parquet("...")
    people.filter(people.age > 30).join(department, people.deptId == department.id)).groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})

    3.1 agg(*exprs)

    没有组的情况下聚集整个DataFrame (df.groupBy.agg()的简写)。

    >>> l=[('jack',5),('john',4),('tom',2)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.agg({"age": "max"}).collect()
    [Row(max(age)=5)]
    >>> from pyspark.sql import functions as F
    >>> df.agg(F.min(df.age)).collect()
    [Row(min(age)=2)]

    3.2 alias(alias)

    返回一个设置别名的新的DataFrame。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> from pyspark.sql.functions import *
    >>> df_as1 = df.alias("df_as1")
    >>> df_as2 = df.alias("df_as2")
    >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
    >>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect()
    [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]

    3.3 cache()

    用默认的存储级别缓存数据(MEMORY_ONLY_SER).

    3.4 coalesce(numPartitions)

    返回一个有确切的分区数的分区的新的DataFrame。
    与在一个RDD上定义的合并类似, 这个操作产生一个窄依赖。 如果从1000个分区到100个分区,不会有shuffle过程, 而是每100个新分区会需要当前分区的10个。

    >>> df.coalesce(1).rdd.getNumPartitions()
    1

    3.5 collect()

    返回所有的记录数为行的列表。

    >>> df.collect()
    [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

    3.6 columns

    返回所有列名的列表。

    >>> df.columns
    ['age', 'name']

    3.7 corr(col1, col2, method=None)

    计算一个DataFrame相关的两列为double值。通常只支持皮尔森相关系数。DataFrame.corr()和DataFrameStatFunctions.corr()类似。
    参数:●  col1 – 第一列的名称  
          ●  col2 – 第二列的名称
               ●  method – 相关方法.当前只支持皮尔森相关系数

    3.8 count()

    返回DataFrame的行数。

    >>> df.count()
    2

    3.9 cov(col1, col2)

    计算由列名指定列的样本协方差为double值。DataFrame.cov()和DataFrameStatFunctions.cov()类似。
    参数:●  col1 – 第一列的名称
          ●  col2 – 第二列的名称

    3.10 crosstab(col1, col2)

    计算给定列的分组频数表,也称为相关表。每一列的去重值的个数应该小于1e4.最多返回1e6个非零对.每一行的第一列会是col1的去重值,列名称是col2的去重值。第一列的名称是$col1_$col2. 没有出现的配对将以零作为计数。DataFrame.crosstab() and DataFrameStatFunctions.crosstab()类似。
    参数:●  col1 – 第一列的名称. 去重项作为每行的第一项。
          ●  col2 – 第二列的名称. 去重项作为DataFrame的列名称。

    3.11 cube(*cols)

    创建使用指定列的当前DataFrame的多维立方体,这样可以聚合这些数据。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.cube('name', df.age).count().show()
    +-----+----+-----+
    | name| age|count|
    +-----+----+-----+
    | null|   2|    1|
    |Alice|null|    1|
    |  Bob|   5|    1|
    |  Bob|null|    1|
    | null|   5|    1|
    | null|null|    2|
    |Alice|   2|    1|
    +-----+----+-----+

    3.12 describe(*cols)

    计算数值列的统计信息。
    包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列。

    >>> df.describe().show()
    +-------+------------------+
    |summary|               age|
    +-------+------------------+
    |  count|                 2|
    |   mean|               3.5|
    | stddev|2.1213203435596424|
    |    min|                 2|
    |    max|                 5|
    +-------+------------------+
    >>> df.describe(['age', 'name']).show()
    +-------+------------------+-----+
    |summary|               age| name|
    +-------+------------------+-----+
    |  count|                 2|    2|
    |   mean|               3.5| null|
    | stddev|2.1213203435596424| null|
    |    min|                 2|Alice|
    |    max|                 5|  Bob|
    +-------+------------------+-----+

    3.13 distinct()

    返回行去重的新的DataFrame。

    >>> l=[('Alice',2),('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.distinct().count()
    2

    3.14 drop(col)

    返回删除指定列的新的DataFrame。
    参数:●  col – 要删除列的字符串类型名称,或者要删除的列。

    >>> df.drop('age').collect()
    [Row(name=u'Alice'), Row(name=u'Bob')] 
    >>> df.drop(df.age).collect()
    [Row(name=u'Alice'), Row(name=u'Bob')]
    >>> l1=[('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> l2=[('Bob',85)]
    >>> df2 = sqlContext.createDataFrame(l2,['name','height'])
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
    [Row(age=5, height=85, name=u'Bob')]
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
    [Row(age=5, name=u'Bob', height=85)]

    3.15 dropDuplicates(subset=None)

    返回去掉重复行的一个新的DataFrame,通常只考虑某几列。
    drop_duplicates()和dropDuplicates()类似。

    >>> from pyspark.sql import Row
    >>> df = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
    >>> df.dropDuplicates().show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    |  5|    80|Alice|
    | 10|    80|Alice|
    +---+------+-----+
    >>> df.dropDuplicates(['name', 'height']).show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    |  5|    80|Alice|
    +---+------+-----+

    3.16 drop_duplicates(subset=None)

    与以上相同。

    3.17 dropna(how='any', thresh=None, subset=None)

    返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。
    参数:●  how – 'any'或者'all'。如果'any',删除包含任何空值的行。如果'all',删除所有值为null的行。
        thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写'how'参数。
       ●  subset – 选择的列名称列表。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> dfnew = df.cube('name', df.age).count()
    >>> dfnew.show()
    +-----+----+-----+
    | name| age|count|
    +-----+----+-----+
    | null|   2|    1|
    |Alice|null|    1|
    |  Bob|   5|    1|
    |  Bob|null|    1|
    | null|   5|    1|
    | null|null|    2|
    |Alice|   2|    1|
    +-----+----+-----+
    >>> dfnew.na.drop().show()
    +-----+---+-----+
    | name|age|count|
    +-----+---+-----+
    |  Bob|  5|    1|
    |Alice|  2|    1|
    +-----+---+-----+

    3.18 dtypes

    返回所有列名及类型的列表。

    >>> df.dtypes
    [('age', 'int'), ('name', 'string')]

    3.19 explain(extended=False)

    将(逻辑和物理)计划打印到控制台以进行调试。
    参数:●  extended – boolean类型,默认为False。如果为False,只打印物理计划。

    >>> df.explain()
    == Physical Plan ==
    Scan ExistingRDD[age#0,name#1]
    >>> df.explain(True)
    == Parsed Logical Plan ==
    ...
    == Analyzed Logical Plan ==
    ...
    == Optimized Logical Plan ==
    ...
    == Physical Plan ==
    ...

    3.20 fillna(value, subset=None)

    替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。
    参数:●  value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.
          ●  subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列,这个非字符串的值会被忽略。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> dfnew = df.cube('name', df.age).count()
    >>> dfnew.show()
    +-----+----+-----+
    | name| age|count|
    +-----+----+-----+
    | null|   2|    1|
    |Alice|null|    1|
    |  Bob|   5|    1|
    |  Bob|null|    1|
    | null|   5|    1|
    | null|null|    2|
    |Alice|   2|    1|
    +-----+----+-----+
    >>> dfnew.na.fill(50).show()
    +-----+---+-----+
    | name|age|count|
    +-----+---+-----+
    | null|  2|    1|
    |Alice| 50|    1|
    |  Bob|  5|    1|
    |  Bob| 50|    1|
    | null|  5|    1|
    | null| 50|    2|
    |Alice|  2|    1|
    +-----+---+-----+
    >>> dfnew.na.fill({'age': 50, 'name': 'unknown'}).show()
    +-------+---+-----+
    |   name|age|count|
    +-------+---+-----+
    |unknown|  2|    1|
    |  Alice| 50|    1|
    |    Bob|  5|    1|
    |    Bob| 50|    1|
    |unknown|  5|    1|
    |unknown| 50|    2|
    |  Alice|  2|    1|
    +-------+---+-----+

    3.21 filter(condition)

    用给定的条件过滤行。
    where()和filter()类似。
    参数:●  条件 - 一个列的bool类型或字符串的SQL表达式。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.filter(df.age > 3).collect()
    [Row(age=5, name=u'Bob')]
    >>> df.where(df.age == 2).collect()
    [Row(age=2, name=u'Alice')]
    >>> df.filter("age > 3").collect()
    [Row(age=5, name=u'Bob')]
    >>> df.where("age = 2").collect()
    [Row(age=2, name=u'Alice')]

    3.22 first()

    返回第一行。

    >>> df.first()
    Row(age=2, name=u'Alice')

    3.23 flatMap(f)

    返回在每行应用F函数后的新的RDD,然后将结果压扁。
    是df.rdd.flatMap()的简写。

    >>> df.flatMap(lambda p: p.name).collect()
    [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']

    3.24 foreach(f)

    应用f函数到DataFrame的所有行。
    是df.rdd.foreach()的简写。

    >>> def f(person):
    ...     print(person.name)
    >>> df.foreach(f)
    Alice
    Bob

    3.25 foreachPartition(f)

    应用f函数到DataFrame的每一个分区。
    是 df.rdd.foreachPartition()的缩写。

    >>> def f(people):
    ...     for person in people:
    ...         print(person.name)
    >>> df.foreachPartition(f)
    Alice
    Bob

    3.26 freqItems(cols, support=None)

    参数:●  cols – 要计算重复项的列名,为字符串类型的列表或者元祖。
          ●  support – 要计算频率项的频率值。默认是1%。参数必须大于1e-4.

    3.27 groupBy(*cols)

    使用指定的列分组DataFrame,这样可以聚合计算。可以从GroupedData查看所有可用的聚合方法。
    groupby()和groupBy()类似。
    参数:●  cols – 分组依据的列。每一项应该是一个字符串的列名或者列的表达式。

    >>> df.groupBy().avg().collect()
    [Row(avg(age)=3.5)]
    >>> df.groupBy('name').agg({'age': 'mean'}).collect()
    [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
    >>> df.groupBy(df.name).avg().collect()
    [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
    >>> df.groupBy(['name', df.age]).count().collect()
    [Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]

    3.28 groupby(*cols)

    和以上一致

    3.29 head(n=None)

    返回前n行
    参数:●  n – int类型,默认为1,要返回的行数。
    返回值: 如果n大于1,返回行列表,如果n为1,返回单独的一行。

    >>> df.head()
    Row(age=2, name=u'Alice')
    >>> df.head(1)
    [Row(age=2, name=u'Alice')]

    3.30 insertInto(tableName, overwrite=False)

    插入DataFrame内容到指定表。
    注:在1.4中已过时,使用DataFrameWriter.insertInto()代替。

    3.31 intersect(other)

    返回新的DataFrame,包含仅同时在当前框和另一个框的行。
    相当于SQL中的交集。

    3.32 intersect(other)

    如果collect()和take()方法可以运行在本地(不需要Spark executors)那么返回True

    3.33 join(other, on=None, how=None)

    使用给定的关联表达式,关联另一个DataFrame。
    以下执行df1和df2之间完整的外连接。
    参数:● other – 连接的右侧
       ● on – 一个连接的列名称字符串, 列名称列表,一个连接表达式(列)或者列的列表。如果on参数是一个字符串或者字符串列表,表示连接列的名称,这些名称必须同时存在join的两个表中, 这样执行的是一个等价连接。
       ● how – 字符串,默认'inner'。inner,outer,left_outer,right_outer,leftsemi之一。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> l2=[('Tom',80),('Bob',85)]
    >>> df2 = sqlContext.createDataFrame(l2,['name','height'])
    >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
    [Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]
    >>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
    [Row(name=u'Tom', height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]
    >>> l3=[('Alice',2,60),('Bob',5,80)]
    >>> df3 = sqlContext.createDataFrame(l3,['name','age','height'])
    >>> cond = [df.name == df3.name, df.age == df3.age]
    >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.join(df2, 'name').select(df.name, df2.height).collect()
    [Row(name=u'Bob', height=85)]
    >>> l4=[('Alice',1),('Bob',5)]
    >>> df4 = sqlContext.createDataFrame(l4,['name','age'])
    >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
    [Row(name=u'Bob', age=5)]

    3.34 limit(num)

    将结果计数限制为指定的数字。

    >>> df.limit(1).collect()
    [Row(age=2, name=u'Alice')]
    >>> df.limit(0).collect()
    []

    3.35 map(f)

    通过每行应用f函数返回新的RDD。
    是 df.rdd.map()的缩写。

    >>> df.map(lambda p: p.name).collect()
    [u'Alice', u'Bob']

    3.36 mapPartitions(f, preservesPartitioning=False)

    通过每个分区应用f函数返回新的RDD
    是df.rdd.mapPartitions()的缩写。

    >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
    >>> def f(iterator): yield 1
    ...
    >>> rdd.mapPartitions(f).sum()
    4

    3.37 na

    返回DataFrameNaFunctions用于处理缺失值。

    3.38 orderBy(*cols, **kwargs)

    返回按照指定列排序的新的DataFrame。
    参数:● cols – 用来排序的列或列名称的列表。
          ● ascending – 布尔值或布尔值列表(默认 True). 升序排序与降序排序。指定多个排序顺序的列表。如果指定列表, 列表的长度必须等于列的长度。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.sort(df.age.desc()).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.sort("age", ascending=False).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]

    3.39 persist(storageLevel=StorageLevel(False, True, False, False, 1))

    设置存储级别以在第一次操作运行完成后保存其值。这只能用来分配新的存储级别,如果RDD没有设置存储级别的话。如果没有指定存储级别,默认为(memory_only_ser)。

    3.40 printSchema()

    打印schema以树的格式

    >>> df.printSchema()
    root
     |-- name: string (nullable = true)
     |-- age: long (nullable = true)

    3.41 randomSplit(weights, seed=None)

    按照提供的权重随机的划分DataFrame。
    参数:● weights – doubles类型的列表做为权重来划分DataFrame。权重会被恢复如果总值不到1.0。
           seed – random的随机数。

    >>> l4=[('Alice',1),('Bob',5),('Jack',8),('Tom',10)]
    >>> df4 = sqlContext.createDataFrame(l4,['name','age'])
    >>> splits = df4.randomSplit([1.0, 2.0],24)
    >>> splits[0].count()
    1
    >>> splits[1].count()
    3

    3.42 rdd

    返回内容为行的RDD。

    3.43 registerAsTable(name)

    注:在1.4中已过时,使用registerTempTable()代替。

    3.44 registerTempTable(name)

    使用给定的名字注册该RDD为临时表
    这个临时表的有效期与用来创建这个DataFrame的SQLContext相关

    >>> df.registerTempTable("people")
    >>> df2 = sqlContext.sql("select * from people")
    >>> sorted(df.collect()) == sorted(df2.collect())
    True

    3.45 repartition(numPartitions, *cols)

    按照给定的分区表达式分区,返回新的DataFrame。产生的DataFrame是哈希分区。
    numPartitions参数可以是一个整数来指定分区数,或者是一个列。如果是一个列,这个列会作为第一个分区列。如果没有指定,将使用默认的分区数。
    1.6版本修改: 添加可选参数可以指定分区列。如果分区列指定的话,numPartitions也是可选的。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.repartition(10).rdd.getNumPartitions()
    10
    >>> data = df.unionAll(df).repartition("age")
    >>> data.show()
    +-----+---+
    | name|age|
    +-----+---+
    |Alice|  2|
    |Alice|  2|
    |  Bob|  5|
    |  Bob|  5|
    +-----+---+
    >>> data = data.repartition(7, "age")
    >>> data.show()
    +-----+---+
    | name|age|
    +-----+---+
    |  Bob|  5|
    |  Bob|  5|
    |Alice|  2|
    |Alice|  2|
    +-----+---+
    >>> data.rdd.getNumPartitions()
    7
    >>> data = data.repartition("name", "age")
    >>> data.show()
    +-----+---+
    | name|age|
    +-----+---+
    |  Bob|  5|
    |  Bob|  5|
    |Alice|  2|
    |Alice|  2|
    +-----+---+

    3.46 replace(to_replace, value, subset=None)

    返回用另外一个值替换了一个值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace() 类似。
    参数:● to_replace – 整形,长整形,浮点型,字符串,或者列表。要替换的值。如果值是字典,那么值会被忽略,to_replace必须是一个从列名(字符串)到要替换的值的映射。要替换的值必须是一个整形,长整形,浮点型,或者字符串。
               ● value – 整形,长整形,浮点型,字符串或者列表。要替换为的值。要替换为的值必须是一个整形,长整形,浮点型,或者字符串。如果值是列表或者元组,值应该和to_replace有相同的长度。
               ● subset – 要考虑替换的列名的可选列表。在subset指定的列如果没有匹配的数据类型那么将被忽略。例如,如果值是字符串,并且subset参数包含一个非字符串的列,那么非字符串的列被忽略。

    >>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
    >>> df4 = sqlContext.createDataFrame(l4,['name','age','height'])
    >>> df4.na.replace(10, 20).show()
    +-----+----+------+
    | name| age|height|
    +-----+----+------+
    |Alice|  20|    80|
    |  Bob|   5|  null|
    |  Tom|null|  null|
    | null|null|  null|
    +-----+----+------+
    >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
    +----+----+------+
    |name| age|height|
    +----+----+------+
    |   A|  10|    80|
    |   B|   5|  null|
    | Tom|null|  null|
    |null|null|  null|
    +----+----+------+

    3.47 rollup(*cols)

    使用指定的列为当前的DataFrame创建一个多维汇总, 这样可以聚合这些数据。

    >>> l=[('Alice',2,80),('Bob',5,None)]
    >>> df = sqlContext.createDataFrame(l,['name','age','height'])
    >>> df.rollup('name', df.age).count().show()
    +-----+----+-----+
    | name| age|count|
    +-----+----+-----+
    |Alice|null|    1|
    |  Bob|   5|    1|
    |  Bob|null|    1|
    | null|null|    2|
    |Alice|   2|    1|
    +-----+----+-----+

    3.48 sample(withReplacement, fraction, seed=None)

    返回DataFrame的子集采样。

    >>> df.sample(False, 0.5, 42).count()
    2

    3.49 sampleBy(col, fractions, seed=None)

    根据每个层次上给出的分数,返回没有替换的分层样本。
    返回没有替换的分层抽样 基于每层给定的一小部分 在给定的每层的片段
    参数:● col – 定义层的列
               ● fractions – 每层的抽样数。如果没有指定层, 将其数目视为0.
               ● seed – 随机数
    返回值: 返回代表分层样本的新的DataFrame

    >>> from pyspark.sql.functions import col
    >>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
    >>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
    >>> sampled.groupBy("key").count().orderBy("key").show()
    +---+-----+
    |key|count|
    +---+-----+
    |  0|    5|
    |  1|    9|
    +---+-----+

    3.50 save(path=None, source=None, mode='error', **options)

    保存DataFrame的数据到数据源。
    注:在1.4中已过时,使用DataFrameWriter.save()代替。

    3.51 saveAsParquetFile(path)

    保存内容为一个Parquet文件,代表这个schema。
    注:在1.4中已过时,使用DataFrameWriter.parquet() 代替。

    3.52 saveAsTable(tableName, source=None, mode='error', **options)

    将此DataFrame的内容作为表保存到数据源。
    注:在1.4中已过时,使用DataFrameWriter.saveAsTable() 代替。

    3.53  schema

    返回DataFrame的schema为types.StructType。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.schema
    StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))

    3.54  select(*cols)

    提供一组表达式并返回一个新的DataFrame。
    参数:● cols – 列名(字符串)或表达式(列)列表。 如果其中一列的名称为“*”,那么该列将被扩展为包括当前DataFrame中的所有列。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.select('*').collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    >>> df.select('name', 'age').collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]

    3.55 selectExpr(*expr)

    投射一组SQL表达式并返回一个新的DataFrame。
    这是接受SQL表达式的select()的变体。

    >>> df.selectExpr("age * 2", "abs(age)").collect()
    [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]

    3.56 show(n=20, truncate=True) 

    将前n行打印到控制台。
    参数:● n – 要显示的行数。
               ● truncate – 是否截断长字符串并对齐单元格。

    >>> df
    DataFrame[name: string, age: bigint]
    >>> df.show()
    +-----+---+
    | name|age|
    +-----+---+
    |Alice|  2|
    |  Bob|  5|
    +-----+---+

    3.57 sort(*cols, **kwargs)

    返回按指定列排序的新DataFrame。
    参数:● cols – 要排序的列或列名称列表。
               ● ascending – 布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

    >>> df.sort(df.age.desc()).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.sort("age", ascending=False).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
    >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
    [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]

    3.58 sortWithinPartitions(*cols, **kwargs)

    返回一个新的DataFrame,每个分区按照指定的列排序。
    参数:● cols – 要排序的列或列名称列表。
                ascending – 布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

    >>> df.sortWithinPartitions("age", ascending=False).show()
    +-----+---+
    | name|age|
    +-----+---+
    |Alice|  2|
    |  Bob|  5|
    +-----+---+

    3.59 stat 

    返回统计功能的DataFrameStatFunctions。

    3.60 subtract(other)

    返回一个新的DataFrame,这个DataFrame中包含的行不在另一个DataFrame中。
    这相当于SQL中的EXCEPT。

    3.61 take(num)

    返回前num行的行列表

    >>> df.take(2)
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

    3.62 toDF(*cols)

    返回一个新类:具有新的指定列名称的DataFrame。
    参数:● cols – 新列名列表(字符串)。

    >>> df.toDF('f1', 'f2').collect()
    [Row(f1=u'Alice', f2=2), Row(f1=u'Bob', f2=5)]

    3.63 toJSON(use_unicode=True)

    将DataFrame转换为字符串的RDD。
    每行都将转换为JSON格式作为返回的RDD中的一个元素。

    >>> df.toJSON().first()
    u'{"name":"Alice","age":2}'

    3.64 toPandas()

    将此DataFrame的内容返回为Pandas pandas.DataFrame。
    这只有在pandas安装和可用的情况下才可用。

    >>> df.toPandas()  
       age   name
    0    2  Alice
    1    5    Bob

    3.65 unionAll(other)

    返回包含在这个frame和另一个frame的行的联合的新DataFrame。
    这相当于SQL中的UNION ALL。

    3.66 unpersist(blocking=True)

    将DataFrame标记为非持久性,并从内存和磁盘中删除所有的块。

    3.67 where(condition)

    使用给定表达式过滤行。
    where()是filter()的别名。
    参数:● condition – 一个布尔类型的列或一个SQL表达式的字符串。

    >>> l=[('Alice',2),('Bob',5)]
    >>> df = sqlContext.createDataFrame(l,['name','age'])
    >>> df.filter(df.age > 3).collect()
    [Row(name=u'Bob', age=5)]
    >>> df.where(df.age == 2).collect()
    [Row(name=u'Alice', age=2)]
    
    >>> df.filter("age > 3").collect()
    [Row(name=u'Bob', age=5)]
    >>> df.where("age = 2").collect()
    [Row(name=u'Alice', age=2)]

    3.68 withColumn(colName, col)

    通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
    参数:● colName – 字符串,新列的名称
               ● col – 新列的列表达式

    >>> df.withColumn('age2', df.age + 2).collect()
    [Row(name=u'Alice', age=2, age2=4), Row(name=u'Bob', age=5, age2=7)]

    3.69 withColumnRenamed(existing, new)

    通过重命名现有列来返回新的DataFrame。
    参数:● existing – 字符串,要重命名的现有列的名称
               ● col – 字符串,列的新名称

    >>> df.withColumnRenamed('age', 'age2').collect()
    [Row(name=u'Alice', age2=2), Row(name=u'Bob', age2=5)]

    3.70 write

    用于将DataFrame的内容保存到外部存储的接口。
    返回:DataFrameWriter

  • 相关阅读:
    linux下启动和关闭网卡命令及DHCP上网
    python 编码问题
    paddlepaddle
    Convolutional Neural Network Architectures for Matching Natural Language Sentences
    deep learning RNN
    Learning Structured Representation for Text Classification via Reinforcement Learning 学习笔记
    Python IO密集型任务、计算密集型任务,以及多线程、多进程
    EM 算法最好的解释
    tensorflow 调参过程
    tensorflow 学习纪录(持续更新)
  • 原文地址:https://www.cnblogs.com/wonglu/p/7784825.html
Copyright © 2011-2022 走看看