zoukankan      html  css  js  c++  java
  • PySpark 自定义聚合函数 UDAF

    自定义聚合函数 UDAF 目前有点麻烦,PandasUDFType.GROUPED_AGG 在2.3.2的版本中不知怎么回事,不能使用!

    这样的话只能曲线救国了!

     

    PySpark有一组很好的聚合函数(例如,count,countDistinct,min,max,avg,sum),但这些并不适用于所有情况(特别是如果你试图避免代价高昂的Shuffle操作)。

    PySpark目前有pandas_udfs,它可以创建自定义聚合器,但是你一次只能“应用”一个pandas_udf。如果你想使用多个,你必须预先形成多个groupBys ......并且避免那些改组。

    在这篇文章中,我描述了一个小黑客,它使您能够创建简单的python UDF,它们对聚合数据起作用(此功能只应存在于Scala中!)。

    1 
    2 
    3 
    4 
    5 
    6 
    7 
    8
    
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    
    a = sc.parallelize([[1, 'a'],
                        [1, 'b'],
                        [1, 'b'],
                        [2, 'c']]).toDF(['id', 'value'])
    a.show()
    
    ID
    1 '一个'
    1 'B'
    1 'B'
    2 'C'

    我使用collect_list将给定组中的所有数据放入一行。我打印下面这个操作的输出。

    1
    
    a.groupBy('id').agg(F.collect_list('value').alias('value_list')).show()
    
    IDVALUE_LIST
    1 ['a','b','b']
    2 ['C']

    然后我创建一个UDF,它将计算这些列表中字母'a'的所有出现(这可以很容易地在没有UDF的情况下完成,但是你明白了)。此UDF包含collect_list,因此它作用于collect_list的输出。

    1 
    2 
    3 
    4 
    5 
    6 
    7 
    8 
    9 
    10 
    11
    
    def find_a(x):
      """Count 'a's in list."""
      output_count = 0
      for i in x:
        if i == 'a':
          output_count += 1
      return output_count
    
    find_a_udf = F.udf(find_a, T.IntegerType())
    
    a.groupBy('id').agg(find_a_udf(F.collect_list('value')).alias('a_count')).show()
    
    IDA_COUNT
    1 1
    2 0

    我们去!作用于聚合数据的UDF!接下来,我展示了这种方法的强大功能,结合何时让我们控制哪些数据进入F.collect_list。

    首先,让我们创建一个带有额外列的数据框。

    1 
    2 
    3 
    4 
    5 
    6 
    7 
    8 
    9
    
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    
    a = sc.parallelize([[1, 1, 'a'],
                        [1, 2, 'a'],
                        [1, 1, 'b'],
                        [1, 2, 'b'],
                        [2, 1, 'c']]).toDF(['id', 'value1', 'value2'])
    a.show()
    
    ID值1值2
    1 1 '一个'
    1 2 '一个'
    1 1 'B'
    1 2 'B'
    2 1 'C'

    请注意,我如何在collect_list中包含一个when。请注意,UDF仍然包含collect_list。

    1
    
    a.groupBy('id').agg(find_a_udf( F.collect_list(F.when(F.col('value1') == 1, F.col('value2')))).alias('a_count')).show()
    
    IDA_COUNT
    1 1
    2 0

    https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/

    还有一种做法就是用pandas_udf, series 添加一列分组变量然后去重。

    还有就是使用输入输出都是dataframe 的 pandas_udf

  • 相关阅读:
    Vue-router笔记
    webpack及其配置
    高阶函数+组件化开发
    Es6语法+v-on参数相关+vue虚拟dom
    英语资源及其APP推荐
    Qt获取ip地址
    QT创建和使用动态链接库
    error C2664: “strcmp”: 不能将参数 1 从“WCHAR [260]”转换为“const char *”
    Vue|VUE router 导航重复点击报错的问题解决方案
    Vue|触发路由跳转
  • 原文地址:https://www.cnblogs.com/leebxo/p/13492626.html
Copyright © 2011-2022 走看看