zoukankan      html  css  js  c++  java
  • 知识点-Spark小节

    Spark处理字符串日期的max和min的方式
    Spark处理数据存储到Hive的方式
    Spark处理新增列的方式map和udf、functions
    Spark处理行转列pivot的使用
    Python 3.5.3
    Spark1.6.2

    欢迎访问个人主页博客

    Spark处理字符串日期的max和min的方式

    一般是字符串类型的日期在使用Spark的agg求max时,是不正确的,API显示只支持数值型的max、min
    hive的SQL查询引擎是支持字符串日期的max和min的

    字符串日期转为时间戳再聚合

    unix_timestamp

    public static Column unix_timestamp(Column s)
    Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.
    Parameters:
    s - (undocumented)
    Returns:
    (undocumented)
    Since:
    1.5.0
    from pyspark.sql import functions as F
    
    df.withColumn('startuptime_stamp', F.unix_timestamp('startuptime'))
    使用HiveSQL
    select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id

    Spark处理数据存储到Hive的方式

    通常Spark任务处理后的结果数据会存储到Hive表中,可以先保存至HDFS目录再load、最方便还是直接使用临时表和HiveContext插入数据

    saveAsTextFile & load data

    repartition根据实际文件大小进行调整,数据比较小时,保存成一个文件

    df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)

    先删除分区,如果已经存在的话
    再覆盖原来的数据【方便重新重复跑或修复数据】
    此处使用shell,也可使用HiveContext的sql

    alter table app_table drop if exists partition(datestr='$day_01');
    load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition(datestr='$day_01');
    hivectx.sql & insert
    app_table1_df.registerTempTable("app_table1_tmp")
    app_table2_df.registerTempTable("app_table2_tmp")
    hivectx.sql("set spark.sql.shuffle.partitions=1")
    hivectx.sql("alter table app_table drop if exists partition(datestr='%s')" % daystr)
    hivectx.sql("insert overwrite table app_table partition(datestr='%s') select * from app_table1_tmp" % daystr)
    hivectx.sql("insert into app_table partition(datestr='%s') select * from app_table2_tmp" % daystr)

    Spark处理新增列的方式map和udf、functions

    Spark在处理数据转换时,通常需要使用map、flatmap等操作,其中使用map会产生新的列或修改某列字段的值
    Spark同样支持自定义函数UDF以及提供了类似Hive内置函数的各种各样的处理函数

    map

    需要定义函数和StructType
    忽略数值判断细节和精度等

    from pyspark.sql.types import *
    
    def a_func(_):
        return _['id'], _['cnt1'], _['cnt2'], _['cnt1'] / (_['cnt1'] + _['cnt1'])
    
    a_schema = StructType([
        StructField('id', StringType(), True),
        StructField('cnt1', IntegerType(), True),
        StructField('cnt2', IntegerType(), True),
        StructField('cnt1_rate', IntegerType(), True)
    ])
    
    a_new_df = sqlctx.createDataFrame(df.select('id', 'cnt1', 'cnt2').map(a_func), a_schema)
    udf

    需要定义函数和UDF
    忽略数值判断细节和精度等

    def a_func(cnt1, cnt2):
        return cnt1 / (cnt1 + cnt2)
    
    a_udf = F.udf(a_func, IntegerType())
    
    a_new_df = df.withColumn('cnt1_rate', a_udf(df['cnt1'], df['cnt2'])
    functions

    处理类似日期字符串的格式转换、等等等
    https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

    Spark处理行转列pivot的使用

    在使用SQL查询数据时,很多情况下需要将行转为列,以有利于数据的展示和不同维度需求的利用
    一般可采用子查询case when、连续join、字段补全union的形式
    Spark的DataFrame中可以通过GroupedData的pivot函数来实现

    df.groupBy(['course_name']).pivot('daystr').sum('score')
    
    df.groupBy(['course_name']).pivot('daystr').count()

    转换前

    daystr course_name score
    2017-11-15 yuwen 1
    2017-11-15 yuwen 1
    2017-11-15 shuxue 1
    2017-11-15 yingyu 2
    2017-11-16 yuwen 1
    2017-11-16 shuxue 1
    2017-11-16 yingyu 2
    

    转换后

    course_name 2017-11-15 2017-11-16
    yuwen 2 1
    shuxue 1 1
    yingyu 2 2
    
    course_name 2017-11-15 2017-11-16
    yuwen 2 1
    shuxue 1 1
    yingyu 1 1

    原文地址:https://blog.icocoro.me/2017/11/16/1711-zhishidian-spark%E5%B0%8F%E8%8A%8201/index.html
  • 相关阅读:
    085 Maximal Rectangle 最大矩形
    084 Largest Rectangle in Histogram 柱状图中最大的矩形
    083 Remove Duplicates from Sorted List 有序链表中删除重复的结点
    082 Remove Duplicates from Sorted List II 有序的链表删除重复的结点 II
    081 Search in Rotated Sorted Array II 搜索旋转排序数组 ||
    080 Remove Duplicates from Sorted Array II 从排序阵列中删除重复 II
    079 Word Search 单词搜索
    078 Subsets 子集
    bzoj2326: [HNOI2011]数学作业
    bzoj2152: 聪聪可可
  • 原文地址:https://www.cnblogs.com/once/p/7966105.html
Copyright © 2011-2022 走看看