zoukankan      html  css  js  c++  java
  • PySpark—DataFrame笔记

    本人CSDN同篇文章:PySpark—DataFrame笔记
     DataFrame基础 + 示例,为了自查方便汇总了关于PySpark-dataframe相关知识点,集合了很多篇博客和知乎内容,结合了自身实践,加上了更多示例和讲解方便理解,本文内容较多配合目录看更方便。

     如有任何问题或者文章错误欢迎大家留言批评指正,感谢阅读。

    什么是DataFrame?

    DataFrames通常是指本质上是表格形式的数据结构。它代表行,每个行都包含许多观察值。
    行可以具有多种数据格式(异构),而列可以具有相同数据类型(异构)的数据。
    DataFrame通常除数据外还包含一些元数据。例如,列名和行名。
    我们可以说DataFrames是二维数据结构,类似于SQL表或电子表格。
    DataFrames用于处理大量结构化和半结构化数据
    

    连接本地spark

    from pyspark.sql import SparkSession
    
    spark = SparkSession 
        .builder 
        .appName('my_app_name') 
        .getOrCreate()
    

    Spark初始化设置

    from pyspark.sql import SparkSession
    
    # SparkSession 配置
    spark = SparkSession.builder 
        .appName("My test") 
        .getOrCreate()
    # spark.conf.set("spark.executor.memory", "1g")
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    sc = spark.sparkContext
    sc.setLogLevel("WARN")   
    

    SparkSession 介绍

    参考文章:
    SparkSession思考与总结:https://blog.csdn.net/yyt8582/article/details/81840031
    SparkSession的认识:https://www.cnblogs.com/zzhangyuhang/p/9039695.html
    spark配置:https://spark.apache.org/docs/latest/configuration.html

     (1)为何出现SparkSession

     SparkSession 本质上是SparkConf、SparkContext、SQLContext、HiveContext和StreamingContext这些环境的集合,避免使用这些来分别执行配置、Spark环境、SQL环境、Hive环境和Streaming环境。SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。

     (2)SparkSession创建RDD

    from pyspark.sql.session import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.master("local") 
            .appName("My test") 
            .config("spark.some.config.option", "some-value") 
            .getOrCreate()
        sc = spark.sparkContext
    
        data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
        rdd = sc.parallelize(data)
    

     (3)SparkSession实例化参数:

     通过静态类Builder来实例化。Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。可以通SparkSession.builder 来创建一个 SparkSession 的实例,并通过 stop 函数来停止 SparkSession。Builder又有很多方法,包括:

    Builder 的方法如下:

    (1)appName函数
    appName(String name)
    用来设置应用程序名字,会显示在Spark web UI中
    
    (2)master函数
    master(String master)
    设置Spark master URL 连接,比如"local" 设置本地运行,"local[4]"本地运行4cores,或则"spark://master:7077"运行在spark standalone 集群。
    
    (3)config函数
    这里有很多重载函数。其实从这里我们可以看出重载函数,是针对不同的情况,使用不同的函数,但是他们的功能都是用来设置配置项的。
    spark.some.config.option和some-value是configuation性质的键值对完成。例如spark configuration properties和yarn properties等
      1、.config(SparkConf conf)
      根据给定的SparkConf设置配置选项列表。
      2、config(String key, boolean value)
      设置配置项,针对值为boolean的
      3、config(String key, double value)
      设置配置项,针对值为double的
      4、config(String key, long value)
      设置配置项,针对值为long 的
      5、config(String key, String value)
      设置配置项,针对值为String 的
    
    (4)getOrCreate函数
    getOrCreate()
    获取已经得到的 SparkSession,或则如果不存在则创建一个新的基于builder选项的SparkSession
    
    (5)enableHiveSupport函数
    表示支持Hive,包括 链接持久化Hive metastore, 支持Hive serdes, 和Hive用户自定义函数
    
    (6)withExtensions函数
    withExtensions(scala.Function1<SparkSessionExtensions,scala.runtime.BoxedUnit> f)
    这允许用户添加Analyzer rules, Optimizer rules, Planning Strategies 或者customized parser.这一函数我们是不常见的。
    

    DF创建

     (1)直接创建

    # 直接创建Dataframe
    df = spark.createDataFrame([
            (1, 144.5, 5.9, 33, 'M'),
            (2, 167.2, 5.4, 45, 'M'),
            (3, 124.1, 5.2, 23, 'F'),
            (4, 144.5, 5.9, 33, 'M'),
            (5, 133.2, 5.7, 54, 'F'),
            (3, 124.1, 5.2, 23, 'F'),
            (5, 129.2, 5.3, 42, 'M'),
        ], ['id', 'weight', 'height', 'age', 'gender']) 
    

     (2)从字典创建

    df = spark.createDataFrame([{'name':'Alice','age':1},
        {'name':'Polo','age':1}]) 
    

     (3)指定schema创建

    schema = StructType([
        StructField("id", LongType(), True),   
        StructField("name", StringType(), True),
        StructField("age", LongType(), True),
        StructField("eyeColor", StringType(), True)
    ])
    df = spark.createDataFrame(csvRDD, schema)
    

     (4)读文件创建

    airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='	')
    

     (5)从pandas dataframe创建

    import pandas as pd
    from pyspark.sql import SparkSession
    
    colors = ['white','green','yellow','red','brown','pink']
    color_df=pd.DataFrame(colors,columns=['color'])
    color_df['length']=color_df['color'].apply(len)
    
    color_df=spark.createDataFrame(color_df)
    color_df.show()
    

    DF的架构查看

    df.printSchema()
    

    describe统计属性

    # 例如特定列中的行总数,其均值,标准差,列的最小值和最大值/空值计数
    df.describe(["age"]).show()
    

    dtypes查看字段类型

    # 查看列的类型 ,同pandas
    color_df.dtypes
    
    # [('color', 'string'), ('length', 'bigint')]
    

    查看列名/行数

    # 查看有哪些列 ,同pandas
    df.columns
    # ['color', 'length']
    
    # 行数
    df.count()
    
    # 列数
    len(df.columns)
    

    distinct查找列唯一值

    df.select('id').distinct()
      .rdd.map(lambda r: r[0]).collect()
    

    show显示

    # show和head函数显示数据帧的前N行
    df.show(5)
    df.head(5)
    

    统计分析

     (1)频繁项目

    # 查找每列出现次数占总的30%以上频繁项目
    df.stat.freqItems(["id", "gender"], 0.3).show()
    +------------+----------------+
    |id_freqItems|gender_freqItems|
    +------------+----------------+
    |      [5, 3]|          [M, F]|
    +------------+----------------+
    

     (2)交叉表

    # 分组统计,交叉分析
    # 计算给定列的成对频率表
    df.crosstab('Age', 'Gender').show()
    Output:
    +----------+-----+------+
    |Age_Gender|    F|     M|
    +----------+-----+------+
    |      0-17| 5083| 10019|
    |     46-50|13199| 32502|
    |     18-25|24628| 75032|
    |     36-45|27170| 82843|
    |       55+| 5083| 16421|
    |     51-55| 9894| 28607|
    |     26-35|50752|168835|
    +----------+-----+------
    

    select选择和切片筛选

     (1)列的选择

    # 选择一列的几种方式,比较麻烦,不像pandas直接用df['cols']就可以了
    # 需要在filter,select等操作符中才能使用
    color_df.select('length').show()
    color_df.select(color_df.length).show()
    color_df.select(color_df[0]).show()
    color_df.select(color_df['length']).show()
    color_df.filter(color_df['length']>=4).show()   # filter方法
    

     (2)选择几列的方法

    color_df.select('length','color').show()
    # 如果是pandas,似乎要简单些
    color_df[['length','color']]
    

     (3)多列选择和切片

    # 3.多列选择和切片
    color_df.select('length','color')
            .select(color_df['length']>4).show()
    

     (4)between 范围选择

    color_df.filter(color_df.length.between(4,5) )
            .select(color_df.color.alias('mid_length')).show()
    

     (5)联合筛选

    # 这里使用一种是 color_df.length, 另一种是color_df[0]
    color_df.filter(color_df.length>4)
            .filter(color_df[0]!='white').show()
    

     (6)filter运行类SQL

    color_df.filter("color='green'").show()
    
    color_df.filter("color like 'b%'").show()
    

     (7)where方法的SQL

    color_df.where("color like '%yellow%'").show()
    

     (8)直接使用SQL语法

    # 首先dataframe注册为临时表,然后执行SQL查询
    color_df.createOrReplaceTempView("color_df")
    spark.sql("select count(1) from color_df").show()
    

    drop删除一列

    # 删除一列
    color_df.drop('length').show()
    
    # pandas写法
    df.drop(labels=['a'],axis=1)
    

    withColumn新增/修改列

    withColumn(colName, col)
    通过为原数据框添加一个新列或替换已存在的同名列而返回一个新数据框。colName —— 是一个字符串, 为新列的名字。必须是已存在的列的名字
    col —— 为这个新列的 Column 表达式。必须是含有列的表达式。如果不是它会报错 AssertionError: col should be Column
    

     (1)新增一列

    # 列名可以是原有列,也可以是新列
    df.withColumn('page_count', df.page_count+100)
    df.withColumn('new_page_count', df.page_count+100)
    

     (2)lit新增一列常量

    # lit新增一列常量
    import pyspark.sql.functions as F
    df = df.withColumn('mark', F.lit(1))
    

    withColumnRenamed更改列名:

     (1)直接修改

    # 修改单个列名
    new_df = df.withColumnRenamed('old_name', 'new_name')
    

     (2)聚合后修改

    一、withColumnRenamed()方式修改列名:
    # 重新命名聚合后结果的列名(需要修改多个列名就跟多个:withColumnRenamed)
    # 聚合之后不修改列名则会显示:count(member_name)
    df_res.agg({'member_name': 'count', 'income': 'sum', 'num': 'sum'})
          .withColumnRenamed("count(member_name)", "member_num").show()
    
    二、利用pyspark.sql中的functions修改列名:
    from pyspark.sql import functions as F
    df_res.agg(
        F.count('member_name').alias('mem_num'),
        F.sum('num').alias('order_num'),
        F.sum("income").alias('total_income')
    ).show()
    

    cast修改列数据类型

    from pyspark.sql.types import IntegerType
    
    # 下面两种修改方式等价
    df = df.withColumn("height", df["height"].cast(IntegerType()))
    df = df.withColumn("weight", df.weight.cast('int'))
    print(df.dtypes)
    

    sort排序

     (1)单字段排序

    # spark排序
    color_df.sort('color',ascending=False).show()
    
    # pandas的排序
    df.sort_values(by='b')
    

     (2)多字段排序

    color_df.filter(color_df['length']>=4)
            .sort('length', 'color', ascending=False).show()
    

     (3)混合排序

    color_df.sort(color_df.length.desc(),color_df.color.asc())                               
            .show()
    

     (4)orderBy排序

    color_df.orderBy('length','color').show()
    

    toDF

    toDF(*cols)
    Parameters:
      cols – list of new column names (string)
      
    # 返回具有新指定列名的DataFrame
    df.toDF('f1', 'f2')
    

    DF与RDD互换

    rdd_df = df.rdd	  # DF转RDD
    df = rdd_df.toDF()  # RDD转DF
    

    DF和Pandas互换

    pandas_df = spark_df.toPandas()	
    spark_df = sqlContext.createDataFrame(pandas_df)
    

    union合并+去重:

    nodes_cust = edges.select('tx_ccl_id', 'cust_id') # 客户编号 
    nodes_cp = edges.select('tx_ccl_id', 'cp_cust_id') # 交易对手编号 
    nodes_cp = nodes_cp.withColumnRenamed('cp_cust_id', 'cust_id') # 统一节点列名 
    nodes = nodes_cust.union(nodes_cp).dropDuplicates(['cust_id'])
    

    count行数/列数

    # 行数
    df.count()
    
    # 列数
    len(df.columns)
    

    缺失值

     (1)计算列中的空值数目

    # 计算一列空值数目
    df.filter(df['col_name'].isNull()).count()
    
    # 计算每列空值数目
    for col in df.columns:
        print(col, "	", "with null values: ", 
              df.filter(df[col].isNull()).count())
    

     (2)删除有缺失值的行

    # 1、删除有缺失值的行
    df2 = df.dropna()
    
    # 2、或者
    df2 = df.na.drop()
    

     (3)平均值填充缺失值

    from pyspark.sql.functions import when
    import pyspark.sql.functions as F
    
    # 计算各个数值列的平均值
    def mean_of_pyspark_columns(df, numeric_cols):
        col_with_mean = []
        for col in numeric_cols:
            mean_value = df.select(F.avg(df[col]))
            avg_col = mean_value.columns[0]
            res = mean_value.rdd.map(lambda row: row[avg_col]).collect()
            col_with_mean.append([col, res[0]])
        return col_with_mean
    
    # 用平均值填充缺失值
    def fill_missing_with_mean(df, numeric_cols):
        col_with_mean = mean_of_pyspark_columns(df, numeric_cols)
        for col, mean in col_with_mean:
            df = df.withColumn(col, when(df[col].isNull() == True, F.lit(mean)).otherwise(df[col]))
        return df
    
    if __name__ == '__main__':
        # df需要自行创建
        numeric_cols = ['age2', 'height2']  # 需要填充空值的列
        df = fill_missing_with_mean(df, numeric_cols)  # 空值填充
        df.show()
    

    替换值

     (1)replace 全量替换

    # 替换pyspark dataframe中的任何值,而无需选择特定列
    df = df.replace('?',None)
    df = df.replace('ckd 	','ckd')
    

     (2)functions 部分替换

    # 只替换特定列中的值,则不能使用replace.而使用pyspark.sql.functions
    # 用classck的notckd替换no
    import pyspark.sql.functions as F
    df = df.withColumn('class',
                       F.when(df['class'] == 'no', F.lit('notckd'))
                        .otherwise(df['class']))
    

    groupBy + agg 聚合

     (1)agg

    agg(self, *exprs)计算聚合并将结果返回为:`DataFrame`
    可用的聚合函数有“avg”、“max”、“min”、“sum”、“count”。
    :param exprs:从列名(字符串)到聚合函数(字符串)的dict映射,
    或:类:`Column`的列表。
    
    # 官方接口示例
    >>> gdf = df.groupBy(df.name)
    >>> sorted(gdf.agg({"*": "count"}).collect())
    [Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
    
    >>> from pyspark.sql import functions as F
    >>> sorted(gdf.agg(F.min(df.age)).collect())
    [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
    

     (2)sum

    # 获得两列总分数和总人数,groupBy可以根据多列分组
    df = df.groupBy('anchor_id')
    	   .agg({"live_score": "sum", "live_comment_count": "sum"})
    	   .withColumnRenamed("sum(live_score)", "total_score")
    	   .withColumnRenamed("sum(live_comment_count)", "total_people")
    

     (3)avg

    # avg方法计算平均得分
    df = df.groupBy("course_id")
           .agg({"score": "avg"})
           .withColumnRenamed("avg(score)", "avg_score")
    

     (4)count

    # count方法计算资源个数
    df = df.groupBy("course_id")
           .agg({"comment": "count"})
           .withColumnRenamed("count(comment)", "comment_count")
    

     (5)max/min

    # max取最大值min取最小值
    df = df.groupBy("org_id")
           .agg({"publish_date": "max"})
           .withColumnRenamed("max(publish_date)", "active_time")
    

     (6)collect_list()

    # collect_list()将groupBy的数据处理成列表
    from pyspark.sql import functions as F
    edges.show()
    df = edges.groupBy("tx_ccl_id").agg(F.collect_list("cust_id"))
              .withColumnRenamed("collect_list(cust_id)", "comment_list")
    df.show()
    # edge.show():结果
    +-------+----------+-----------+---------+
    |cust_id|cp_cust_id|drct_tx_amt|tx_ccl_id|
    +-------+----------+-----------+---------+
    |     18|        62|  7646.5839|        0|
    |     88|        41|  7683.6484|        0|
    |     90|        68| 16184.5801|        0|
    |     95|         5| 11888.3697|        0|
    ………………………………………………………………………………………………………………
    # df.show():结果
    +---------+---------------------+
    |tx_ccl_id|collect_list(cust_id)|
    +---------+---------------------+
    |        0| [18, 88, 90, 95, ...|
    |        1| [1077, 1011, 1004...|
    

    join 连接

    http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

    join(other, on=None, how=None)
    # 使用给定的连接表达式与另一个DataFrame连接
    Parameters:
    other – 连接的右端
    on – 联接列名的字符串、列名列表、联接表达式(列)或列列表。如果on是指示联接列名称的字符串或字符串列表,则该列必须存在于两边,这将执行equi联接。
    how – str, 默认inner连接. 必须是以下的其中一个: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
    

     (1)根据单个字段连接

    # 外连接
    >>> df.join(df2, df.name == df2.name, 'outer')
          .select(df.name, df2.height).collect()
    [Row(name=None, height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
    
    # 外连接(与上个示例等价)
    >>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
    [Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
    

     (2)根据多个字段连接

    # 根据多个字段连接
    >>> cond = [df.name == df3.name, df.age == df3.age]
    >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
    [Row(name='Alice', age=2), Row(name='Bob', age=5)]
    
    # 根据多个字段连接(与上个示例等价)
    >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
    [Row(name='Bob', age=5)]
    

    差集/并集/交集

    # 创建数据
    df = spark.createDataFrame((
          (1, "asf"),
          (2, "2143"),
          (3, "rfds")
        )).toDF("label", "sentence")
    
    df2 = spark.createDataFrame((
          (1, "asf"),
          (2, "2143"),
          (4, "f8934y")
        )).toDF("label", "sentence")
    

     (1)差集

    newDF = df.select("sentence")
    		  .subtract(df2.select("sentence"))
    newDF.show()
    
    +--------+
    |sentence|
    +--------+
    |  f8934y|
    +--------+
    

     (2)交集

    newDF = df.select("sentence")
    		  .intersect(df2.select("sentence"))
    newDF.show()
    
    +--------+
    |sentence|
    +--------+
    |     asf|
    |    2143|
    +--------+
    

     (3)并集

    newDF = df.select("sentence")
    		  .union(df2.select("sentence"))
    newDF.show()
    
    +--------+
    |sentence|
    +--------+
    |     asf|
    |    2143|
    |  f8934y|
    |     asf|
    |    2143|
    |    rfds|
    +--------+
    

     (4)并集+去重

    newDF = df.select("sentence")
              .union(df2.select("sentence")).distinct()
    newDF.show()
    
    +--------+
    |sentence|
    +--------+
    |    rfds|
    |     asf|
    |    2143|
    |  f8934y|
    +--------+
    

    UDF自定义函数

    # 创建用户自定义函数
    # UDF对表中的每一行进行函数处理,返回新的值
    udf(f=None, returnType=StringType)
    Parameters:
      f – python函数(如果用作独立函数)
      returnType – 用户定义函数的返回类型。
    
    示例如下:
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType, StringType
    
    # 自定义函数1
    def to_upper(s):
        if s is not None:
            return s.upper()
    
    # 自定义函数2
    def add_one(x):
        if x is not None:
            return x + 1
    
    # 注册udf函数
    slen_udf = udf(lambda s: len(s), IntegerType())
    to_upper_udf = udf(to_upper, StringType())
    add_one_udf = udf(add_one, IntegerType())
    
    if __name__ == '__main__':
    	df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    	# 法一:
    	# df.select(slen("name").alias("slen(name)"), 
    				to_upper_udf(df["name"]), 
    				add_one_udf(df["age"])).show()  # 与下面一句等价
    	df.select(slen_udf("name").alias("slen(name)"), 
    			  to_upper_udf("name"), 
    			  add_one_udf("age")).show()
    
    	# 法二:
    	df = df.withColumn("slen(name)", slen_udf(df["name"]))
    	df = df.withColumn("age", add_one_udf(df["age"]))
    	df = df.withColumn("name", to_upper_udf("name"))
    	df.show()
    

    explode分割

    # 为给定数组或映射中的每个元素返回一个新行
    from pyspark.sql.functions import split, explode
    
    df = sc.parallelize([(1, 2, 3, 'a b c'),
                         (4, 5, 6, 'd e f'),
                         (7, 8, 9, 'g h i')])
            .toDF(['col1', 'col2', 'col3', 'col4'])
    df.withColumn('col4', explode(split('col4', ' '))).show()
    +----+----+----+----+
    |col1|col2|col3|col4|
    +----+----+----+----+
    |   1|   2|   3|   a|
    |   1|   2|   3|   b|
    |   1|   2|   3|   c|
    |   4|   5|   6|   d|
    |   4|   5|   6|   e|
    |   4|   5|   6|   f|
    |   7|   8|   9|   g|
    |   7|   8|   9|   h|
    |   7|   8|   9|   i|
    +----+----+----+----+
    
    # 示例二
    from pyspark.sql import Row
    from pyspark.sql.functions import explode
    
    eDF = spark.createDataFrame([Row(
        a=1, 
        intlist=[1, 2, 3], 
        mapfield={"a": "b"})])
    eDF.select(explode(eDF.intlist).alias("anInt")).show()
    +-----+
    |anInt|
    +-----+
    |    1|
    |    2|
    |    3|
    +-----+
    

    DF和python变量互转

     在sparkSQL编程的时候,经常需要获取DataFrame的信息,然后python做其他的判断或计算,比如获取dataframe的行数以判断是否需要等待,获取dataframe的某一列或第一行信息以决定下一步的处理,等等。

    ##&ensp;(1)获取第一行的值
    ```python
    # 获取第一行的值,返回普通python变量
    # 由于 first() 返回的是 Row 类型,可以看做是dict类型,
    # 在只有一列的情况下可以用 [0] 来获取值。
    value = df.select('columns_name').first()[0] 
    

     (2)获取第一行的多个值

    #获取第一行的多个值,返回普通python变量
    # first() 返回的是 Row 类型,可以看做是dict类型,用 row.col_name 来获取值
    row = df.select('col_1', 'col_2').first()
    col_1_value = row.col_1
    col_2_value = row.col_2
    

     (3)获取一列/多列的所有值

    # 获取一列的所有值,或者多列的所有值
    # collect()函数将分布式的dataframe转成local类型的 list-row格式
    rows= df.select('col_1', 'col_2').collect()
    value = [[ row.col_1, row.col_2 ] for row in rows ]
    

    不常用的一些

     (1)getField

    # 在StructField中通过名称获取字段。
    from pyspark.sql import Row
    df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
    df.select(df.r.getField("b")).show()  # 用法与下面等价
    df.select(df.r.a).show()
    

     (2)isNotNull

    # 如果当前表达式不为空,则为true
    from pyspark.sql import Row
    df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
    df.filter(df.height.isNotNull()).collect()
    # [Row(height=80, name='Tom')]
    

     (3)isNull

    # 如果当前表达式为空,则为true
    from pyspark.sql import Row
    df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
    df.filter(df.height.isNull()).collect()
    # [Row(height=None, name='Alice')]
    

     (4)isin

    # 如果自变量的求值包含该表达式的值,则该表达式为true
    df[df.name.isin("Bob", "Mike")].collect()
    # [Row(age=5, name='Bob')]
    df[df.age.isin([1, 2, 3])].collect()
    # [Row(age=2, name='Alice')]
    

     (5)like

    # Column根据SQL LIKE匹配返回布尔值。
    df.filter(df.name.like('Al%')).collect()
    # [Row(age=2, name='Alice')]
    

     (6)otherwise

    otherwise(value)
    # 计算条件列表,并返回多个可能的结果表达式之一,如果otherwise()未调用,则为不匹配的条件返回None
    
    from pyspark.sql import functions as F
    df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
    +-----+-------------------------------------+
    | name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
    +-----+-------------------------------------+
    |Alice|                                    0|
    |  Bob|                                    1|
    +-----+-------------------------------------+
    

     (7)when

    when(condition, value)
    Parameters:
      condition – 布尔Column表达式
      value – 文字值或Column表达式
    # 计算条件列表,并返回多个可能的结果表达式之一.如果otherwise()未调用,则为不匹配的条件返回None
    
    from pyspark.sql import functions as F
    >>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
    +-----+------------------------------------------------------------+
    | name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
    +-----+------------------------------------------------------------+
    |Alice|                                                          -1|
    |  Bob|                                                           1|
    +-----+------------------------------------------------------------+
    

    后续需要整理学习:

    PySpark SQL常用语法:https://www.jianshu.com/p/177cbcb1cb6f
    PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理:
    https://blog.csdn.net/sinat_26917383/article/details/80500349?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2

    参考文章:

    pyspark官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
    关于spark的博客集合:https://blog.csdn.net/qq_32023541/column/info/19893
    pyspark配置config:https://www.cnblogs.com/Tw1st-Fate/p/11094344.html

    DataFrame基础:https://blog.csdn.net/suzyu12345/article/details/79673493
    DataFrame:https://www.jianshu.com/p/cb0fec7a4f6d
    列累积求和:https://blog.csdn.net/XnCSD/article/details/90676259
    dataframe,排序并排名:https://blog.csdn.net/a1272899331/article/details/90268141

    pyspark sql使用总结:https://blog.csdn.net/weixin_44053979/article/details/89296224
    pyspark 分组取前几个:https://blog.csdn.net/weixin_40161254/article/details/88817225
    Dataframe使用的坑 与 经历:https://cloud.tencent.com/developer/article/1435995
    Pandas 和 PySpark 的 DataFrame 相互转换:http://fech.in/2018/pyspark_and_pandas/
    读写dataframe:https://blog.csdn.net/suzyu12345/article/details/79673473#31-写到csv
    DataFrame操作指南:增/删/改/查/合并/统计与数据处理:
    https://blog.csdn.net/sinat_26917383/article/details/80500349?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2

  • 相关阅读:
    网络虚拟化中的 offload 技术:LSO/LRO、GSO/GRO、TSO/UFO、RSS、VXLAN
    pve5下的iptables案例分析-无法访问input相关端口
    linux mail相关-涉及windows下nslookup使用
    qemu的vnc选项-参考至qemu wiki
    windows任务管理器-线程优先级
    在proxmox中模拟树莓派
    unzip命令的使用
    fdisk分区遇到的问题-涉及lv逻辑卷扩容
    fail2ban(1)
    Different Integers 牛客网暑期ACM多校训练营(第一场) J 离线+线状数组或者主席树
  • 原文地址:https://www.cnblogs.com/liaowuhen1314/p/12792202.html
Copyright © 2011-2022 走看看